from __future__ import annotations
import tempfile
import warnings
from pathlib import Path
from typing import Any, Literal
import numpy as np
import uxarray as uxr
from matplotlib.axes import Axes
from numpy.typing import ArrayLike, NDArray
from scipy.spatial.transform import Rotation
from cratermaker.components.morphology import Morphology
from cratermaker.components.scaling import Scaling
from cratermaker.components.surface import LocalSurface, Surface
from cratermaker.components.target import Target
from cratermaker.constants import FloatLike, PairOfFloats
from cratermaker.utils.general_utils import format_large_units, parameter, validate_and_normalize_location
[docs]
@Surface.register("hireslocal")
class HiResLocalSurface(Surface):
"""
Create a uniform grid configuration with the given pixel size.
Parameters
----------
pix : FloatLike
The approximate face size inside the local region in meters.
local_radius : FloatLike
The radius of the local region in meters.
local_location : PairOfFloats | None
The longitude and latitude of the location in degrees. If None, it will be set to (0.0, 0.0).
superdomain_scale_factor : FloatLike, optional
A factor defining relative size of the face at the antipode of the local region to the face size inside the local region.
If not provided, construction of the surface will be deferred until the `set_superdomain` method is called.
If a negative number is provided, it will be computed based on a provided (or default) scaling and morphology model, and will be set so that smallest craters that can be resolved on faces outside the local region could potentially deposit ejecta at the boundary of the local region.
Default is -1 (which triggers automatic computation based on scaling and morphology models).
target : Target, optional
The target body or name of a known target body for the impact simulation. If none provide, it will be either set to the default,
or extracted from the scaling model if it is provied
reset : bool, optional
Flag to indicate whether to reset the surface. Default is True.
regrid : bool, optional
Flag to indicate whether to regrid the surface. Default is False.
simdir : str | Path
|simdir|
Returns
-------
HiResLocalSurface
An instance of the HiResLocalSurface clas initialized with the given point distribution
"""
def __init__(
self,
pix: FloatLike,
local_radius: FloatLike,
local_location: PairOfFloats | None = None,
superdomain_scale_factor: FloatLike | None = None,
target: Target | str | None = None,
reset: bool = True,
regrid: bool = False,
simdir: str | Path | None = None,
**kwargs: Any,
):
object.__setattr__(self, "_local", None)
object.__setattr__(self, "_superdomain_scale_factor", None)
object.__setattr__(self, "_superdomain_function_slope", None)
object.__setattr__(self, "_superdomain_function_exponent", None)
object.__setattr__(self, "_local_grid_indices_file_extension", "npz")
super().__init__(target=target, simdir=simdir, **kwargs)
self.pix = pix
self.local_radius = local_radius
if local_location is None:
local_location = (0.0, 0.0)
self.local_location = local_location
if superdomain_scale_factor is not None:
if superdomain_scale_factor < 0:
self.set_superdomain(reset=reset, regrid=regrid, **kwargs)
else:
self.superdomain_scale_factor = superdomain_scale_factor
self._load_from_files(reset=reset, regrid=regrid, **kwargs)
return
def __str__(self) -> str:
str_repr = super().__str__()
pix = format_large_units(self.pix, quantity="length")
pix_min = format_large_units(self.pix_min, quantity="length")
pix_max = format_large_units(self.pix_max, quantity="length")
local_radius = format_large_units(self.local_radius, quantity="length")
str_repr += (
f"Local pixel size: {pix}"
f"Local Radius: {local_radius}\n"
f"Local Location: ({self.local_location[0]:.2f}°, {self.local_location[1]:.2f}°)\n"
f"Minimum effective pixel size: {pix_min}\n"
f"Maximum effective pixel size: {pix_max}\n"
f"Number of local faces: {self.local.n_face}\n"
f"Number of local nodes: {self.local.n_node}\n"
f"Number of superdomain faces: {self.n_face - self.local.n_face}\n"
f"Number of superdomain nodes: {self.n_node - self.local.n_node}\n"
)
return str_repr
[docs]
def reset(
self,
**kwargs: Any,
) -> None:
"""
Resets the surface by removing all saved output files (except for the grid).
This will also reset the local surface.
"""
super().reset(**kwargs)
if self._local is not None:
self._local.reset(**kwargs)
return
[docs]
def superdomain_function(self, r):
"""
Defines the superdomain scale factor based on the distance from the local boundary.
This is set so that smallest craters that are modeled outside the local region are those whose ejecta could just reach the boundary.
It is a piecewise function that returns the local pixel size inside the local region and a power law function outside. The slope and exponent of the power law is linear if superdomain_scale_factor is set explicitly, otherwise it is computed by the `set_superdomain` method.
Parameters
----------
r : FloatLike
The distance from the local center in meters.
Returns
-------
FloatLike
The effective pixel size at the given distance from the local center.
"""
return np.where(
r < self.local_radius,
self.pix,
self.pix + self.superdomain_function_slope * (r - self.local_radius) ** self.superdomain_function_exponent,
)
[docs]
def save(
self,
interval: int = 0,
time_variables: dict | None = None,
**kwargs,
) -> None:
"""
Save the surface data to the specified directory.
Each data variable is saved to a separate NetCDF file. If 'time_variables' is specified, then a one or more variables will be added to the dataset along the time dimension. If 'interval' is included as a key in `time_variables`, then this will be appended to the data file name.
Parameters
----------
interval : int, optional
Interval number to append to the data file name. Default is 0.
time_variables : dict, optional
Dictionary containing one or more variable name and value pairs. These will be added to the dataset along the time dimension. Default is None.
"""
self._full().save(
interval=interval,
time_variables=time_variables,
**kwargs,
)
if not self.local.grid_file.exists():
self.local._write_grid_file()
return
[docs]
def export(
self,
driver: str = "GPKG",
interval: int | None = None,
superdomain: bool = False,
ask_overwrite: bool | None = None,
**kwargs: Any,
) -> None:
"""
Export the surface data to the specified format.
Parameters
----------
driver : str, optional
The driver to use export the data to. Supported formats are 'VTK' or a driver supported by GeoPandas ('GPKG', 'ESRI Shapefile', etc.).
interval : int | None, optional
The interval number to export. If None, all intervals currently saved will be exported. Default is None.
superdomain : bool, optional
If True, export the full surface including the superdomain. If False, export only the local region. Default is False.
ask_overwrite : bool | None, optional
|ask_overwrite_methods|
**kwargs : Any
|kwargs|
"""
if superdomain:
self._full().export(
driver=driver,
interval=interval,
ask_overwrite=ask_overwrite,
**kwargs,
)
self.local.export(
driver=driver,
interval=interval,
ask_overwrite=ask_overwrite,
**kwargs,
)
return
[docs]
def plot(
self,
plot_style: Literal["map", "hillshade"] = "map",
variable_name: str | None = None,
interval: int | None = None,
cmap: str | None = None,
label: str | None = None,
scalebar: bool | None = None,
colorbar: bool = True,
show: bool = True,
save: bool = False,
ax: Axes | None = None,
close_when_done: bool | None = None,
minimum_plot_width: float | None = 800.0,
superdomain: bool = False,
**kwargs: Any,
) -> None:
"""
Plot an image of the surface.
Parameters
----------
plot_style : str, optional
The style of the plot. Options are "map" and "hillshade". In "map" mode, the variable is displayed as a colored map. In "hillshade" mode, a hillshade image is generated using "face_elevation" data. If a different variable is passed to `variable`, then the hillshade will be overlayed with that variable's data. Default is "map".
variable_name : str | None, optional
The variable to plot. If None is provided then "face_elevation" is used in "map" mode.
interval: int | None, optional
The interval number of the surface to plot. If None, the currently loaded surface data will be used.
cmap : str, optional
The colormap to use for the plot. If None, a default colormap will be used ("cividis" by default and "grey" when plot_style=="hillshade" and variable=="face_elevation").
label : str | None, optional
A label for the plot. If None, no label will be added.
scalebar : bool, optional
If True, a scalebar will be added to the plot. Default is True.
colorbar : bool, optional
If True, a colorbar will be added to the plot when using "map" plot_style or "hillshade" with a variable overlay. Default is True.
show : bool, optional
If True, the plot will be displayed. Default for local surfaces is True.
save : bool, optional
If True, the plot will be saved to the default plot directory. Default is False.
ax : matplotlib.axes.Axes, optional
An existing Axes object to plot on. If None, a new figure and axes will be created.
close_when_done : bool, optional
If True, the figure will be closed after plotting. Default is True when save is True and show is False, and False otherwise.
minimum_plot_width : float, optional
Because the width of the plot is determined by the number of faces, small regions will generate small plots with labels that are hard to read. This parameter sets a lower limit to the width of the image that is generated by the plot. By default it is 800. Set to None to turn it off.
superdomain : bool, optional
If True, plot the full surface including the superdomain. If False, plot only the local region. Default is False.
**kwargs : Any
|kwargs|
"""
if superdomain:
if scalebar is None:
scalebar = False
return self._full().plot(
plot_style=plot_style,
variable_name=variable_name,
interval=interval,
cmap=cmap,
label=label,
scalebar=scalebar,
colorbar=colorbar,
show=show,
save=save,
ax=ax,
close_when_done=close_when_done,
minimum_plot_width=minimum_plot_width,
**kwargs,
)
else:
if scalebar is None:
scalebar = True
return self.local.plot(
plot_style=plot_style,
variable_name=variable_name,
interval=interval,
cmap=cmap,
label=label,
scalebar=scalebar,
colorbar=colorbar,
save=save,
show=show,
ax=ax,
close_when_done=close_when_done,
minimum_plot_width=minimum_plot_width,
**kwargs,
)
[docs]
def to_raster(self, variable_name: str = "face_elevation", superdomain: bool = False, **kwargs: Any):
"""
Rasterize a face-based variable into a 2D raster using rasterio.
Parameters
----------
variable_name : str, optional
The name of the variable to rasterize. Default is "face_elevation".
**kwargs : Any
|kwargs|
Returns
-------
raster : NDArray[np.float32]
The rasterized variable as a 2D numpy array.
extent : tuple[float, float, float, float]
The extent of the raster in the format (xmin, xmax, ymin, ymax).
transform : Affine
The affine transform for the raster.
crs : CRS
The coordinate reference system of the raster.
"""
if superdomain:
return self._full().to_raster(variable_name, **kwargs)
else:
return self.local.to_raster(variable_name, **kwargs)
[docs]
def to_geotiff_file(
self,
interval: int | None = None,
variable_name: str = "face_elevation",
superdomain: bool = False,
**kwargs,
) -> None:
"""
Rasterize a face-based elevation variable into a GeoTIFF using rasterio.
Parameters
----------
interval : int | None, optional
The interval number to export. Default is None, which will export all saved intervals.
variable_name : str, optional
The name of the variable to rasterize. Default is "face_elevation".
superdomain : bool, optional
If True, export the full surface including the superdomain. If False, export only the local region. Default is False.
**kwargs : Any
|kwargs|
"""
if superdomain:
return self._full().to_geotiff_file(
interval=interval,
variable_name=variable_name,
**kwargs,
)
else:
return self.local.to_geotiff_file(
interval=interval,
variable_name=variable_name,
**kwargs,
)
[docs]
def pyvista_plotter(
self, variable_name: str | None = None, variable: ArrayLike | None = None, superdomain: bool = False, **kwargs
):
"""
Show the surface using an interactive 3D plot.
Parameters
----------
engine : str, optional
The engine to use for plotting. Currently, only "pyvista" is supported. Default is "pyvista".
variable_name : str | None, optional
The name of the variable to plot. If the name of the variable is not already stored on the surface mesh, then the `variable` argument must also be passed. Default is None, which will plot a greyscale image of the surface.
variable : (n_face) array, optional
An array face values that will be used to color the surface mesh. This is required if `variable_name` is not stored on the mesh.
superdomain : bool, optional
If True, show the full surface including the superdomain. If False, show only the local region. Default is False.
**kwargs : Any
|kwargs|
Returns
-------
pyvista.Plotter
The PyVista Plotter object for further customization.
"""
if superdomain:
return self._full().pyvista_plotter(variable=variable, variable_name=variable_name, **kwargs)
else:
return self.local.pyvista_plotter(variable=variable, variable_name=variable_name, **kwargs)
[docs]
def set_superdomain(
self,
morphology: Morphology | str | None = None,
reset: bool = False,
regrid: bool = False,
**kwargs: Any,
):
"""
Set the superdomain scale factor based on the scaling and morphology models.
This sets the cell size at the antipode such that ejecta from a crater of that size just reaches the local region.
Parameters
----------
morphology : Morphology | str | None, optional
The morphology model to use. If None, the default morphology model will be used.
reset : bool, optional
Flag to indicate whether to reset the surface. Default is False.
regrid : bool, optional
Flag to indicate whether to regrid the surface. Default is False.
**kwargs : Any
|kwargs|
"""
from scipy.optimize import curve_fit
from cratermaker import Crater
morphology = Morphology.maker(morphology, surface=self, target=self.target, **kwargs)
antipode_distance = np.pi * self.target.radius
projectile_velocity = morphology.scaling.projectile.mean_velocity * 10
distance = 1.0
dvals = []
sdvals = []
superdomain_size = distance * 0.1
while distance < antipode_distance:
for diameter in np.logspace(
np.log10(superdomain_size),
np.log10(self.target.radius * 2),
1000,
):
crater = morphology.Crater.maker(
diameter=diameter,
angle=90.0,
projectile_velocity=projectile_velocity,
)
rmax = morphology.rmax(crater=crater, minimum_thickness=1e-3)
if rmax >= distance:
superdomain_size = crater.diameter
break
dvals.append(distance)
sdvals.append(superdomain_size)
distance = distance + superdomain_size
def plaw(x, a, b):
return a * x**b
try:
popt = curve_fit(plaw, dvals, sdvals, p0=[1.0, 1.0])[0]
self._superdomain_function_slope = popt[0]
self._superdomain_function_exponent = popt[1]
except Exception:
print("Could not fit superdomain function, using default values.")
self._superdomain_function_slope = 1.0
self._superdomain_function_exponent = 1.0
self._superdomain_scale_factor = self.superdomain_function(antipode_distance)
self._load_from_files(reset=reset, regrid=regrid, **kwargs)
return
[docs]
def set_face_proj(self):
"""
Set the face projection for the local surface.
"""
if self.is_local:
self.local.set_face_proj()
return
[docs]
def set_node_proj(self):
"""
Set the node projection for the local surface.
"""
if self.is_local:
self.local.set_node_proj()
return
def _rotate_point_cloud(self, points):
"""
Rotate a point cloud so that the point at [0,0,1] moves to (lon,lat).
The following convention is used:
- Longitude [-180, 180] degrees, increasing eastward.
- Latitude [-90, 90] degrees, increasing northward.
- (0,0) lon/lat corresponds to [0,0,1] in Cartesian space.
Parameters
----------
points (np.ndarray): Nx3 array of (x,y,z) points.
Returns
-------
np.ndarray: Rotated Nx3 point cloud.
"""
# Convert target lon, lat to radians
lon_rad, lat_rad = (
np.radians(self.local_location[0]),
np.radians(self.local_location[1]),
)
# Compute target unit vector
target = np.array(
[
np.cos(lat_rad) * np.cos(lon_rad),
np.cos(lat_rad) * np.sin(lon_rad),
np.sin(lat_rad),
]
)
# Original vector is the north pole
original = np.array([0, 0, 1])
axis = np.cross(original, target)
axis_norm = np.linalg.norm(axis)
if axis_norm < 1e-10:
return points
axis /= axis_norm
angle = np.arccos(np.clip(np.dot(original, target), -1.0, 1.0))
rotvec = axis * angle
rotation = Rotation.from_rotvec(rotvec)
return rotation.apply(points)
def _generate_face_distribution(self, **kwargs: Any) -> NDArray:
"""
Creates the points that define the mesh centers.
Returns
-------
(3,n) ndarray of np.float64
Array of points on a unit sphere.
"""
def _interior_distribution(theta):
arc_distance = self.radius * theta
delta = self.pix / self.radius
if arc_distance < self.pix:
return [], theta + delta
if np.pi * self.radius - arc_distance < self.pix:
return [], np.pi
max_extent = arc_distance / self.radius
coords = np.arange(-max_extent, max_extent + delta, delta)
lat_grid, lon_grid = np.meshgrid(coords, coords, indexing="ij")
pix_distance_sq = (lon_grid / delta) ** 2 + (lat_grid / delta) ** 2
inner = (arc_distance / self.pix - 1.0) ** 2
outer = (arc_distance / self.pix) ** 2
mask = (pix_distance_sq >= inner) & (pix_distance_sq < outer)
lat = lat_grid[mask]
lon = lon_grid[mask]
cos_lat = np.cos(lat)
sin_lat = np.sin(lat)
cos_lon = np.cos(lon)
sin_lon = np.sin(lon)
z = cos_lat * cos_lon
x = cos_lat * sin_lon
y = sin_lat
points = np.column_stack([x, y, z])
return points.tolist(), theta + delta
def _exterior_distribution(theta):
r = self.radius
arc_distance = r * theta
pix_local = self.superdomain_function(arc_distance)
if (np.pi * r - arc_distance) < pix_local:
return [], np.pi
theta_next = theta + pix_local / r
sin_theta = np.sin(theta)
cos_theta = np.cos(theta)
n_phi = max(1, int(round(2 * np.pi * r * sin_theta / pix_local)))
phi = np.linspace(0, 2 * np.pi, n_phi, endpoint=False)
x = sin_theta * np.cos(phi)
y = sin_theta * np.sin(phi)
z = np.full_like(phi, cos_theta)
points = np.column_stack((x, y, z))
return points.tolist(), theta_next
print(f"Center of local region: {self.local_location}")
print(f"Radius of local region: {format_large_units(self.local_radius, quantity='length')}")
print(f"Local region pixel size: {format_large_units(self.pix, quantity='length')}")
interior_points = []
theta = 0.0
while theta <= self.local_radius / self.radius:
new_points, theta = _interior_distribution(theta)
interior_points.extend(new_points)
print(f"Generated {len(interior_points)} points in the local region.")
exterior_points = []
while theta < np.pi:
new_points, theta = _exterior_distribution(theta)
exterior_points.extend(new_points)
print(f"Generated {len(exterior_points)} points in the superdomain region.")
points = interior_points + exterior_points
decimals = max(6, -int(np.floor(np.log10(self.pix / self.radius))))
points = np.array(points, dtype=np.float64)
points = np.round(points, decimals=decimals)
points = np.unique(points, axis=0)
points = self._rotate_point_cloud(points).T # rotates from the north pole to local_location
return points
def _load_from_files(self, reset: bool = False, **kwargs: Any):
is_same_grid = self._is_same_grid
ask_overwrite = self.ask_overwrite # Store the current value so we can temporarily turn it off for the reset
super()._load_from_files(reset=reset, **kwargs)
if is_same_grid and self.local_grid_indices_file is not None and Path(self.local_grid_indices_file).exists():
with np.load(self.local_grid_indices_file) as grid_data:
face_indices = grid_data["face_indices"]
node_indices = grid_data["node_indices"]
edge_indices = grid_data["edge_indices"]
self._local = LocalHiResLocalSurface(
surface=self,
face_indices=face_indices,
node_indices=node_indices,
edge_indices=edge_indices,
location=self.local_location,
region_radius=self.local_radius,
**vars(self.common_args),
)
self._set_local_identifiers()
else:
_ = self.local # triggers extraction of local surface and saving of local grid indices file
self.ask_overwrite = False
if reset:
self.reset(reset=reset, **kwargs)
self.ask_overwrite = ask_overwrite # Restore the original value of ask_overwrite after the reset
return
[docs]
def compute_location_from_distance_bearing(
self,
distance: FloatLike | ArrayLike,
bearing: FloatLike | ArrayLike,
reference_location: PairOfFloats | None = None,
) -> NDArray[np.float64]:
"""
Calculate the longitude and latitude of one or more points given a reference point, initial bearings, and distances.
Parameters
----------
bearings : FloatLike or ArrayLike
Initial bearing from the reference point to the target point or points in degrees.
distances : FloatLike or ArrayLike
Great circle distance from the reference point to the target point or points in meters.
reference_location : PairOfFloats, optional
Longitude and latitude of the reference point in degrees. Default is the value of `local_location`
Returns
-------
NDArray
Longitude and latitude of the target point or points in degrees.
"""
if reference_location is None:
reference_location = self.local_location
return super().compute_location_from_distance_bearing(
distance=distance, bearing=bearing, reference_location=reference_location
)
@property
def local(self) -> LocalSurface:
"""
Returns the local view of the surface.
"""
if self._local is None:
self._local = self.extract_region(location=self.local_location, region_radius=self.local_radius)
self.local.grid_file.unlink(missing_ok=True)
self.local_grid_indices_file.unlink(missing_ok=True)
np.savez_compressed(
file=self.local_grid_indices_file,
face_indices=self.local.face_indices,
node_indices=self.local.node_indices,
edge_indices=self.local.edge_indices,
allow_pickle=False,
)
self._set_local_identifiers()
return self._local
def _set_local_identifiers(self):
"""
Set the local identifiers for the local surface.
"""
if self._local is not None:
self.add_data(
name="is_local_face",
data=np.isin(np.arange(self.n_face), self.local.face_indices),
long_name="Is local face",
units=None,
dtype=bool,
overwrite=True,
)
self.add_data(
name="is_local_node",
data=np.isin(np.arange(self.n_node), self.local.node_indices),
long_name="Is local node",
units=None,
dtype=bool,
overwrite=True,
)
return
@parameter
def pix(self) -> FloatLike:
"""
The approximate face size for a cell of the mesh.
"""
return self._pix
@pix.setter
def pix(self, value: FloatLike):
if not isinstance(value, FloatLike) or np.isnan(value) or np.isinf(value) or value <= 0:
raise TypeError("pix must be a positive float")
self._pix = value
@parameter
def local_radius(self) -> FloatLike:
"""
The radius of the local region in meters.
"""
return self._local_radius
@local_radius.setter
def local_radius(self, value: FloatLike):
if not isinstance(value, FloatLike) or np.isnan(value) or np.isinf(value) or value <= 0:
raise TypeError("local_radius must be a positive float")
if value > np.pi * self.radius:
raise ValueError("local_radius must be less than pi * radius of the target body")
if value < self.pix:
raise ValueError("local_radius must be greater than or equal to pix (the approximate face size in the local region")
self._local_radius = value
@parameter
def local_location(self) -> PairOfFloats:
"""
The longitude and latitude of the location in degrees.
"""
return self._local_location
@local_location.setter
def local_location(self, value: PairOfFloats):
if not isinstance(value, tuple | list | np.ndarray) or len(value) != 2:
raise TypeError("local_location must be a tuple of two floats")
self._local_location = validate_and_normalize_location(value)
@property
def local_grid_indices_file(self) -> Path:
"""
The path to the local grid indices file.
"""
return self.output_dir / f"local_{self._grid_file_prefix}_indices.{self._local_grid_indices_file_extension}"
@property
def superdomain_function_slope(self):
if self._superdomain_function_slope is None:
d0 = self.local_radius
d1 = np.pi * self.radius - self.local_radius
pix_lo = self.pix
pix_hi = self.pix * self.superdomain_scale_factor
self._superdomain_function_slope = (pix_hi - pix_lo) / (d1 - d0)
return self._superdomain_function_slope
@property
def superdomain_function_exponent(self):
if self._superdomain_function_exponent is None:
self._superdomain_function_exponent = 1.0
return self._superdomain_function_exponent
@parameter
def superdomain_scale_factor(self):
"""
A factor defining the ratio of cell size to the distance from the local boundary.
This is set so that smallest craters that are modeled outside the local region are those whose ejecta could just reach the boundary.
"""
return self._superdomain_scale_factor
@superdomain_scale_factor.setter
def superdomain_scale_factor(self, value: FloatLike):
if not isinstance(value, FloatLike) or np.isnan(value) or np.isinf(value) or value < 1.0:
raise TypeError("superdomain_scale_factor must be a positive float greater than or equal to 1")
self._superdomain_scale_factor = value
@property
def _hashvars(self):
"""
The variables used to generate the hash.
"""
return super()._hashvars + [self.pix, self.local_radius, self.local_location, self.superdomain_scale_factor]
[docs]
class LocalHiResLocalSurface(LocalSurface):
"""
Generates a regional view of a subset of the surface mesh without making copies of any of the data.
Parameters
----------
surface : Surface
The surface object that contains the mesh data.
face_indices : NDArray | slice
The indices of the faces to include in the view.
node_indices : NDArray | slice | None, optional
The indices of the nodes to include in the view. If None, all nodes connected to the faces will be extracted when required
edge_indices : NDArray | slice | None, optional
The indices of the edges to include in the view. If None, all edges connected to the faces will be extracted when required.
location : tuple[float, float] | None, optional
The location of the center of the view in degrees. This is intended to be passed via the extract_region method of Surface.
region_radius : FloatLike | None, optional
The radius of the region to include in the view in meters. This is intended to be passed via the extract_region method of Surface.
"""
def __init__(
self,
surface: Surface | LocalSurface,
face_indices: NDArray | slice | None = None,
node_indices: NDArray | slice | None = None,
edge_indices: NDArray | slice | None = None,
location: tuple[float, float] | None = None,
region_radius: FloatLike | None = None,
**kwargs: Any,
):
if isinstance(surface, LocalSurface):
face_indices = surface.face_indices
node_indices = surface.node_indices
edge_indices = surface.edge_indices
location = surface.location
region_radius = surface.region_radius
surface = surface.surface
object.__setattr__(self, "_local_overlap", None)
object.__setattr__(self, "_edge_mask", None)
object.__setattr__(self, "_face_mask", None)
object.__setattr__(self, "_node_mask", None)
super().__init__(
surface=surface,
face_indices=face_indices,
node_indices=node_indices,
edge_indices=edge_indices,
location=location,
region_radius=region_radius,
reset=False,
**kwargs,
)
return
[docs]
def add_tag(
self,
name: str,
tag: int | None = None,
long_name: str | None = None,
tag_superdomain: bool = False,
**kwargs: Any,
) -> None:
"""
Adds an integer tag to the surface with an option to tag faces in the superdomain.
Parameters
----------
name : str
The name of the tag to be added.
tag : int | None
The integer to apply the tag. If None is provided, the tag layers will be reset to zero
long_name : str | None
The long name of the tag to be added. If None, no long name will be added.
tag_superdomain: bool = False,
If True, apply the tag to surface including the superdomain. If False, apply only to the local region. Default is False.
**kwargs : Any
|kwargs|
"""
if not tag_superdomain:
if self.local_overlap:
return self.local_overlap.add_tag(name=name, tag=tag, long_name=long_name, tag_superdomain=False, **kwargs)
else:
return None
else:
return super().add_tag(name=name, tag=tag, long_name=long_name, **kwargs)
[docs]
def apply_diffusion(self, kdiff: FloatLike | NDArray) -> None:
"""
Apply diffusion to the surface.
Parameters
----------
kdiff : float or array-like
The degradation state of the surface, which is the product of diffusivity and time. It can be a scalar or an array of the same size as the number of faces in the grid.
If it is a scalar, the same value is applied to all faces. If it is an array, it must have the same size as the number of faces in the grid.
The value of kdiff must be greater than 0.0.
Notes
-----
This method only operates on the faces that overlap with the local region of the surface.
"""
if self.local_overlap:
if not np.isscalar(kdiff):
kdiff = kdiff[self.face_mask]
self.local_overlap.apply_diffusion(kdiff)
return
[docs]
def slope_collapse(self, critical_slope_angle: FloatLike = 35.0) -> NDArray:
"""
Collapse all slopes larger than the critical slope angle.
Parameters
----------
critical_slope_angle : float
The critical slope angle (angle of repose) in degrees.
Notes
-----
This method only operates on the faces that overlap with the local region of the surface.
"""
if self.local_overlap:
self.local_overlap.slope_collapse(critical_slope_angle=critical_slope_angle)
return
[docs]
def apply_noise(
self,
model: str = "turbulence",
noise_width: FloatLike = 1000e3,
noise_height: FloatLike = 1e3,
**kwargs: Any,
) -> None:
"""
Apply noise to the node elevations of the surface view.
Parameters
----------
noise_width : float
The spatial wavelength of the noise.
noise_height : float
The amplitude of the noise.
Notes
-----
This method only operates on the faces that overlap with the local region of the surface.
"""
if self.local_overlap:
self.local_overlap.apply_noise(
model=model,
noise_width=noise_width,
noise_height=noise_height,
**kwargs,
)
return
@property
def local_grid_indices_file(self) -> Path:
"""The path to the local grid file."""
return self.surface.local_grid_indices_file
@property
def local_overlap(self) -> LocalSurface | None:
"""A LocalSurface object that contains the overlap between this object and the high resolution local region of the surface, or None if there is no overlap."""
if self._local_overlap is None:
if self.surface.local is None:
return None
if isinstance(self.face_indices, slice):
face_indices = self.surface.face_indices[self.face_indices]
node_indices = self.surface.node_indices[self.node_indices]
edge_indices = self.surface.edge_indices[self.edge_indices]
else:
face_indices = self.face_indices
node_indices = self.node_indices
edge_indices = self.edge_indices
self._face_mask = np.isin(face_indices, self.surface.local.face_indices, kind="table")
if not np.any(self._face_mask):
return None
shared_faces = face_indices[self._face_mask]
if len(shared_faces) == self.n_face:
# If all faces are shared, then we can assume all nodes and edges are also shared.
shared_nodes = self.node_indices
shared_edges = self.edge_indices
self._node_mask = np.full(self.n_node, True, dtype=bool)
self._edge_mask = np.full(self.n_edge, True, dtype=bool)
else:
self._node_mask = np.isin(node_indices, self.surface.local.node_indices, kind="table")
if not np.any(self._node_mask):
return None
shared_nodes = node_indices[self._node_mask]
self._edge_mask = np.isin(edge_indices, self.surface.local.edge_indices, kind="table")
if not np.any(self._edge_mask):
return None
shared_edges = edge_indices[self._edge_mask]
self._local_overlap = LocalSurface(
surface=self._surface,
face_indices=shared_faces,
node_indices=shared_nodes,
edge_indices=shared_edges,
region_radius=self._region_radius,
**vars(self.common_args),
)
self._local_overlap._location = self.location
self._local_overlap._face_distance = self.face_distance[self._face_mask]
self._local_overlap._face_bearing = self.face_bearing[self._face_mask]
self._local_overlap._node_distance = self.node_distance[self._node_mask]
self._local_overlap._node_bearing = self.node_bearing[self._node_mask]
return self._local_overlap
@property
def edge_mask(self) -> NDArray[np.bool] | None:
"""A boolean indicating which edge indices overlap with the local region."""
if self._edge_mask is None:
_ = self.local_overlap
return self._edge_mask
@property
def face_mask(self) -> NDArray[np.bool] | None:
"""A boolean indicating which face indices overlap with the local region."""
if self._face_mask is None:
_ = self.local_overlap
return self._face_mask
@property
def node_mask(self) -> NDArray[np.bool] | None:
"""A boolean indicating which node indices overlap with the local region."""
if self._node_mask is None:
_ = self.local_overlap
return self._node_mask