from __future__ import annotations
import hashlib
import os
import shutil
import tempfile
import warnings
from abc import abstractmethod
from contextlib import AbstractContextManager
from pathlib import Path
from typing import Any, Literal
import numpy as np
import pyvista as pv
import rasterio
import uxarray as uxr
import xarray as xr
from affine import Affine
from matplotlib.axes import Axes
from numpy.typing import ArrayLike, NDArray
from pyproj import CRS, Transformer
from rasterio import DatasetReader, MemoryFile, windows
from rasterio.enums import Resampling
from rasterio.merge import merge
from rasterio.transform import rowcol
from rasterio.vrt import WarpedVRT
from rasterio.windows import Window
from scipy.optimize import OptimizeWarning
from tqdm import tqdm
from uxarray import INT_FILL_VALUE, UxDataArray, UxDataset
from vtk import vtkUnstructuredGrid
from cratermaker.bindings import surface_bindings
from cratermaker.components.target import Target
from cratermaker.constants import _SMALLFAC, _VSMALL, FloatLike, PairOfFloats
from cratermaker.core.base import ComponentBase, CratermakerBase, import_components
from cratermaker.utils.general_utils import format_large_units, validate_and_normalize_location
from cratermaker.utils.montecarlo_utils import get_random_location_on_face
_N_TAG_LAYERS = 8
[docs]
class Surface(ComponentBase):
_registry: dict[str, type[Surface]] = {}
"""
Used for handling surface-related data and operations in the cratermaker project.
It provides methods for setting elevation data, calculating distances and bearings, and other surface-related computations.
The Surface class extends UxDataset for the cratermaker project.
"""
_registry: dict[str, type[Surface]] = {}
def __init__(
self,
target: Target | str | None = None,
simdir: str | Path | None = None,
**kwargs,
):
"""
**Warning:** This object should not be instantiated directly. Instead, use the ``.maker()`` method.
Parameters
----------
target : Target, optional
The target body or name of a known target body for the impact simulation.
reset : bool, optional
Flag to indicate whether to reset the surface. Default is the value of `regrid`
regrid : bool, optional
Flag to indicate whether to regrid the surface. Default is False.
simdir : str | Path
|simdir|
**kwargs : Any
|kwargs|
"""
from cratermaker.components.target import Target
super().__init__(simdir=simdir, **kwargs)
object.__setattr__(self, "_target", None)
object.__setattr__(self, "_uxds", None)
object.__setattr__(self, "_pix", None)
object.__setattr__(self, "_pix_mean", None)
object.__setattr__(self, "_pix_std", None)
object.__setattr__(self, "_pix_min", None)
object.__setattr__(self, "_pix_max", None)
object.__setattr__(self, "_area", None)
object.__setattr__(self, "_edge_tree", None)
object.__setattr__(self, "_edge_face_distance", None)
object.__setattr__(self, "_edge_lengths", None)
object.__setattr__(self, "_edge_indices", None)
object.__setattr__(self, "_node_tree", None)
object.__setattr__(self, "_face_tree", None)
object.__setattr__(self, "_face_area", None)
object.__setattr__(self, "_face_size", None)
object.__setattr__(self, "_face_bin_indices", None)
object.__setattr__(self, "_face_bin_argmin", None)
object.__setattr__(self, "_face_bin_argmax", None)
object.__setattr__(self, "_face_bin_area", None)
object.__setattr__(self, "_face_x", None)
object.__setattr__(self, "_face_y", None)
object.__setattr__(self, "_face_z", None)
object.__setattr__(self, "_node_x", None)
object.__setattr__(self, "_node_y", None)
object.__setattr__(self, "_node_z", None)
object.__setattr__(self, "_smallest_length", None)
object.__setattr__(self, "_crs", None)
object.__setattr__(self, "_output_file_prefix", "surface")
object.__setattr__(self, "_output_dir_name", "surface")
object.__setattr__(self, "_grid_file_prefix", "grid")
object.__setattr__(self, "_output_file_extension", "nc")
object.__setattr__(self, "_is_new", None)
self._output_file_pattern = [
f"{self._output_file_prefix}*.{self._output_file_extension}",
f"{self._grid_file_prefix}.{self._output_file_extension}",
]
self._data_variable_init = {
"node_elevation": {
"units": "m",
"long_name": "elevation of nodes",
"initial_value": 0.0,
"isfacedata": False,
},
"face_elevation": {
"units": "m",
"long_name": "elevation of faces",
"initial_value": 0.0,
"isfacedata": True,
},
}
self.target = Target.maker(target, **kwargs)
return
def __str__(self) -> str:
str_repr = super().__str__()
str_repr += f"Target: {self.target.name}\n"
str_repr += f"Grid File: {self.grid_file}\n"
str_repr += f"Number of faces: {self.n_face}\n"
str_repr += f"Number of nodes: {self.n_node}\n"
return str_repr
[docs]
@classmethod
def maker(
cls: Surface,
surface: str | Surface | None = None,
target: Target | str | None = None,
reset: bool = True,
regrid: bool = False,
simdir: str | Path | None = None,
**kwargs,
) -> Surface:
"""
Initialize a Surface model with the given name or instance.
Parameters
----------
surface : str or Surface, optional
The name of the type of grid used for the surface. Default is "icosphere".
target : Target, optional
The target body or name of a known target body for the impact simulation.
reset : bool, optional
Flag to indicate whether to reset the surface. Default is True.
regrid : bool, optional
Flag to indicate whether to regrid the surface. Default is False.
simdir : str | Path
|simdir|
**kwargs : Any
|kwargs|
Returns
-------
Surface
An initialized Surface object.
"""
if surface is None:
surface = "icosphere"
surface = super().maker(
component=surface,
target=target,
reset=reset,
regrid=regrid,
simdir=simdir,
**kwargs,
)
return surface
def __getattr__(self, name: str):
"""
Get attribute values from the uxds dataset.
Parameters
----------
name : str
The name of the attribute to retrieve.
"""
if not hasattr(self, "uxds"):
raise AttributeError(f"{type(self).__name__!s} has no attribute {name!r}")
uxds = object.__getattribute__(self, "uxds")
if name in uxds:
return uxds[name].data
raise AttributeError(f"{type(self).__name__!s} has no attribute {name!r}")
[docs]
def saved_output_files(self, **kwargs: Any) -> list[Path]:
"""
Check if the component has any output files in its output directory.
Returns
-------
list[Path]
A list of Path objects representing the files that would be removed during a reset operation. Returns an empty list if no files found
"""
return self._full().saved_output_files(**kwargs)
[docs]
def reset(self, **kwargs: Any) -> None:
"""
Reset the surface to its initial state.
Parameters
----------
**kwargs : Any
|kwargs|
"""
super().reset(**kwargs)
# Remove all old data from the dataset
varlist = list(self.uxds.data_vars)
for name in varlist:
if name not in self._data_variable_init:
del self.uxds[name]
for name, entry in self._data_variable_init.items():
self._add_new_data(
name=name,
data=entry["initial_value"],
long_name=entry["long_name"],
units=entry["units"],
isfacedata=entry["isfacedata"],
save_to_file=True,
)
self.is_new = True
return
[docs]
def add_data(
self,
name: str,
data: FloatLike | NDArray,
long_name: str | None = None,
units: str | None = None,
isfacedata: bool = True,
overwrite: bool = False,
fill_value: float = 0.0,
dtype=np.float64,
positive_only: bool = False,
**kwargs: Any,
) -> None:
"""
Adds new data to the surface.
If the data variable already exists, it will be overwritten if `overwrite` is set to True.
Parameters
----------
name : str
Name of the data variable. This will also be used as the data file name.
data : scalar or array-like
Data file to be saved. If data is a scalar, then the data file will be filled with that value. If data is an array, then the data file will be filled with the array values. The data array must have the same size as the number of faces or nodes in the grid.
long_name : str, optional
Long name of the data variable that will be saved as an attribute if this is new data. If the data already exists on the surface, this will be ignored.
units : str, optional
Units of the data variable that will be saved as an attribute if this is new data. If the data already exists on the surface, this will be ignored.
isfacedata : bool, optional, default True
Flag to indicate whether the data is face data or node data. This is only needed if `data` is a scalar, otherwise it is ignored
overwrite : bool, optional, default False
By default, new data is added to the old data. This flag indicates that the data should be overwritten, replacing any old data with the new data.
fill_value : float, optional
The fill value to use for new data variables. Default is 0.0.
dtype : data-type, optional
The data type of the data variable. Default is np.float64.
positive_only: bool, optional
If True, only allow positive values on the data (data will be clipped at 0.0). Default is False.
**kwargs : Any
|kwargs|
Returns
-------
None
"""
return self._full().add_data(
name=name,
data=data,
long_name=long_name,
units=units,
isfacedata=isfacedata,
overwrite=overwrite,
fill_value=fill_value,
dtype=dtype,
positive_only=positive_only,
**kwargs,
)
[docs]
def update_elevation(
self,
new_elevation: ArrayLike | FloatLike,
overwrite: bool = False,
**kwargs: Any,
) -> None:
"""
Update the elevation data for the target's surface mesh. This method will determine whether to update the node or face data (or both) depending on the size of the input data. If a scalar is passed, both node and face elevation will be updated to that value. By default, the elevation data will be added to the existing data. If `overwrite` is set to True, the existing data will be replaced with the new data.
Parameters
----------
new_elevation : ArrayLike | FloatLike
Elevation to be added (or replaced, if overwrite is True). This can be a scalar, an array with the same size as the number of faces, an array with the same size as the number of nodes, or an array with the same size as the number of faces + nodes.
overwrite : bool, optional
If True, the existing data will be replaced with the new data. Default is False.
**kwargs : Any
|kwargs|
Notes
-----
When passing combined data, the first part of the array will be used for face elevation and the second part for node elevation.
"""
return self._full().update_elevation(new_elevation=new_elevation, overwrite=overwrite, **kwargs)
[docs]
def add_tag(
self,
name: str,
tag: int | None = None,
long_name: str | None = None,
**kwargs: Any,
) -> None:
"""
Adds an integer tag to the surface.
Used primarily for tracking crater ids.
Parameters
----------
name : str
The name of the tag variable to perform the add operation on.
tag : int | None
The integer to value of the tag. If None is provided, the tag layers will be reset to zero
long_name : str | None
The long name of the tag variable. This is only used if the tag variable doesn't exist in the surface dataset. If None, no long name will be added.
**kwargs : Any
|kwargs|
"""
return self._full().add_tag(name=name, tag=tag, long_name=long_name, **kwargs)
[docs]
def remove_tag(self, name: str, tag: int, **kwargs: Any) -> None:
"""
Removes an integer tag from the surface.
Parameters
----------
name : str
The name of the tag variable to perform the remove operation on.
tag : int
The integer tag to be removed.
**kwargs : Any
|kwargs|
"""
return self._full().remove_tag(name=name, tag=tag, **kwargs)
[docs]
def apply_diffusion(self, kdiff: FloatLike | NDArray) -> None:
"""
Apply diffusion to the surface.
Parameters
----------
kdiff : float or array-like
The degradation state of the surface, which is the product of diffusivity and time. It can be a scalar or an array of the same size as the number of faces in the grid.
If it is a scalar, the same value is applied to all faces. If it is an array, it must have the same size as the number of faces in the grid.
The value of kdiff must be greater than 0.0.
"""
return self._full().apply_diffusion(kdiff)
[docs]
def slope_collapse(self, critical_slope_angle: FloatLike = 35.0) -> None:
"""
Collapse all slopes larger than the critical slope angle.
Parameters
----------
critical_slope_angle : float
The critical slope angle (angle of repose) in degrees.
"""
return self._full().slope_collapse(critical_slope_angle)
[docs]
def compute_slope(self) -> NDArray[np.float64]:
"""
Compute the slope of the surface.
Returns
-------
NDArray[np.float64]
The slope of all faces in degrees.
"""
return self._full().compute_slope()
[docs]
def apply_noise(
self,
model: str = "turbulence",
noise_width: FloatLike = 1000e3,
noise_height: FloatLike = 1e3,
**kwargs: Any,
) -> None:
"""
Apply noise to the surface.
Parameters
----------
model : str
The noise model to use. Options are "turbulence"
noise_width : float
The width of the noise in meters.
noise_height : float
The height of the noise in meters.
kwargs : Any
Additional arguments to pass to the noise model.
"""
return self._full().apply_noise(model=model, noise_width=noise_width, noise_height=noise_height, **kwargs)
[docs]
def calculate_face_and_node_distances(self, location: tuple[float, float], validate: bool = True) -> tuple[NDArray, NDArray]:
"""
Computes the distances from a given location to all faces and nodes.
Parameters
----------
location : tuple[float, float]
tuple containing the longitude and latitude of the location in degrees.
validate : bool, optional
If the location should be validated and normalized. Only use if the validation is causing a performance bottleneck.
Returns
-------
NDArray
Array of distances for each face in meters.
NDArray
Array of distances for each node in meters.
"""
return self._full().calculate_face_and_node_distances(location, validate=validate)
[docs]
def calculate_face_and_node_bearings(self, location: tuple[float, float]) -> tuple[NDArray, NDArray]:
"""
Computes the initial bearing (relative to North) from a given location to all faces and nodes.
Parameters
----------
location : tuple[float, float]
tuple containing the longitude and latitude of the location in degrees.
Returns
-------
NDArray
Array of initial bearings for each face in degrees.
NDArray
Array of initial bearings for each node in degrees.
Notes
-----
This is intended to be used as a helper to calculate_face_and_node_bearings.
"""
return self._full().calculate_face_and_node_bearings(location)
[docs]
def find_nearest_node(self, location):
"""
Find the index of the nearest node to a given point.
This method calculates the Haversine distance from the given point to each node in the grid, and returns the index of the node with the minimum distance.
Parameters
----------
location : tuple
A tuple containing two elements: (longitude, latitude) in degrees.
Returns
-------
int
The index of the nearest node in the grid to the given point.
Notes
-----
The method uses the ball tree query method that is included in the UxArray.Grid class.
"""
location = validate_and_normalize_location(location)
if len(location) == 1:
location = location.item()
coords = np.asarray(location)
with warnings.catch_warnings():
warnings.simplefilter("ignore", Warning)
node_ind = self.node_tree.query(coords=coords, k=1, return_distance=False)
return node_ind.item()
[docs]
def find_nearest_face(self, location):
"""
Find the index of the nearest face to a given point.
This method calculates the Haversine distance from the given point to each face in the grid, and returns the index of the face with the minimum distance.
Parameters
----------
location : tuple
A tuple containing two elements: (longitude, latitude) in degrees.
Returns
-------
int
The index of the nearest face in the grid to the given point.
Notes
-----
The method uses the ball tree query method that is included in the UxArray.Grid class. It is only approximate, as it looks for whichever face center is closest to the location. This means that it can return a neighboring face, rather than the face that contains the point.
"""
location = validate_and_normalize_location(location)
if len(location) == 1:
location = location.item()
coords = np.asarray(location)
with warnings.catch_warnings():
warnings.simplefilter("ignore", OptimizeWarning)
face_ind = self.face_tree.query(coords=coords, k=1, return_distance=False)
return face_ind.item()
[docs]
def interpolate_node_elevation_from_faces(self) -> None:
"""
Update node elevations by area-weighted averaging of adjacent face elevations.
For each node, the elevation is computed as the area-weighted average of the elevations
of the surrounding faces.
Returns
-------
None
"""
return self._full().interpolate_node_elevation_from_faces()
[docs]
def get_random_location_on_face(self, face_index: int, **kwargs: Any) -> float | tuple[float, float] | ArrayLike:
"""
Generate a random coordinate within a given face of a the mesh.
Parameters
----------
grid : uxarray.Grid
The grid object containing the mesh information.
face_index : int | NDArray[np.int64]
The index or array of indices of the face within the grid to obtain the random sample.
size : int or tuple of ints, optional
The number of samples to generate. If size is None (the default), a single tuple is returned. If size is greater than 1,
then an array of tuples is returned. The default is 1.
**kwargs : Any
|kwargs|
Returns
-------
(lon,lat) or ndarray[(lon,lat)] of given size
A pair or array of pairs of longitude and latitude values in degrees.
Notes
-----
This method is a wrapper for :py:meth:`cratermaker.utils.montecarlo_utils.get_random_location_on_face`.
"""
return get_random_location_on_face(self.uxgrid, face_index, rng=self.rng, **kwargs)
[docs]
def elevation_to_cartesian(self, element="face") -> NDArray[np.float64]:
"""
Convert either the face or node elevations to Cartesian coordinates.
Parameters
----------
element : str, optional
The type of element to convert. Can be "face" or "node". Default is "face".
Returns
-------
NDArray[np.float64, 3]
The Cartesian coordinates of the face elevations.
"""
return self._full().elevation_to_cartesian(element=element)
[docs]
def save(
self,
interval: int = 0,
time_variables: dict | None = None,
filename: str | None = None,
**kwargs,
) -> None:
"""
Save the surface data to the specified directory.
Each data variable is saved to a separate NetCDF file. If 'time_variables' is specified, then a one or more variables will be added to the dataset along the time dimension.
Parameters
----------
interval : int, optional
Interval number to append to the data file name. Default is 0.
time_variables : dict, optional
Dictionary containing one or more variable name and value pairs. These will be added to the dataset along the time dimension. Default is None.
**kwargs : Any
|kwargs|
"""
return self._full().save(
interval=interval,
time_variables=time_variables,
filename=filename,
**kwargs,
)
[docs]
def export(self, driver: str = "GPKG", interval: int | None = None, ask_overwrite: bool | None = None, **kwargs: Any) -> None:
"""
Export the surface data to the specified format.
Parameters
----------
driver : str, optional
The driver to use export the data to. Supported formats are 'VTK' or a driver supported by GeoPandas ('GPKG', 'ESRI Shapefile', etc.), and 'GeoTIFF'.
interval : int | None, optional
|interval_export|
ask_overwrite : bool, optional
|ask_overwrite_methods|
**kwargs : Any
|kwargs|
"""
return self._full().export(
driver=driver,
interval=interval,
ask_overwrite=ask_overwrite,
**kwargs,
)
[docs]
def to_vector_file(
self,
driver: str = "GPKG",
interval: int | None = None,
**kwargs,
) -> None:
"""
Export the face-associated data from the surface view data to a vector file using GeoPandas.
See `geopandas.GeoDataFrame.to_file <https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.to_file.html>`_ for more detailed information on the available parameters.
Parameters
----------
driver : str, optional
The file format driver to use for exporting. Default is 'GPKG'.
interval : int | None, optional
|interval_export|
**kwargs : Any
|kwargs|
"""
return self._full().to_vector_file(driver=driver, interval=interval, **kwargs)
[docs]
def to_raster(
self, variable_name: str = "face_elevation", **kwargs: Any
) -> tuple[NDArray[np.float32], tuple[float, float, float, float], Any, CRS]:
"""
Rasterize a face-based variable into a 2D raster using rasterio.
Parameters
----------
variable_name : str, optional
The name of the variable to rasterize. Default is "face_elevation".
**kwargs : Any
|kwargs|
Returns
-------
raster : NDArray[np.float32]
The rasterized variable as a 2D numpy array.
extent : tuple[float, float, float, float]
The extent of the raster in the format (xmin, xmax, ymin, ymax).
transform : Affine
The affine transform for the raster.
crs : CRS
The coordinate reference system of the raster.
"""
return self._full().to_raster(variable_name, **kwargs)
[docs]
def get_raster_dims(self):
"""
Get the dimensions of the raster based on the target radius and pixel size.
Returns
-------
tuple[int, int]
The width and height of the raster in pixels.
"""
return self._full().get_raster_dims()
[docs]
def to_geotiff_file(
self,
interval: int | None = None,
variable_name: str = "face_elevation",
**kwargs,
) -> None:
"""
Rasterize a face-based elevation variable into a GeoTIFF using rasterio.
Parameters
----------
interval : int | None, optional
|interval_export|
variable_name : str, optional
The name of the variable to rasterize. Default is "face_elevation".
**kwargs : Any
|kwargs|
"""
return self._full().to_geotiff(
interval=interval,
variable_name=variable_name,
**kwargs,
)
[docs]
def to_vtk_mesh(self, uxds: UxDataset | None = None, interval: int | None = None, **kwargs: Any) -> vtkUnstructuredGrid:
"""
Exports the surface to a VTK PolyData object.
Parameters
----------
uxds : UxDataset, optional
The dataset to export. If None, the method will use currently loaded data in the surface. Default is None.
interval : int, optional
The interval number to export. If provided, the method will either extract the interval number from uxds (if it has intervals saved), or, if no uxds is passed, load a saved interval from file.
**kwargs : Any
|kwargs|
Returns
-------
vtkUnstructuredGrid
The VTK PolyData object representing the regional mesh.
"""
return self._full().to_vtk_mesh(uxds=uxds, interval=interval, **kwargs)
[docs]
def to_vtk_file(
self,
interval: int | None = None,
**kwargs: Any,
) -> None:
"""
Export the surface mesh to a VTK file and store it in the default export directory.
Parameters
----------
interval : int | None, optional
|interval_export|
**kwargs : Any
|kwargs|
"""
return self._full().to_vtk_file(
interval=interval,
**kwargs,
)
[docs]
def plot(
self,
plot_style: Literal["map", "hillshade"] = "map",
variable_name: str | None = None,
interval: int | None = None,
cmap: str | None = None,
label: str | None = None,
scalebar: bool | None = None,
colorbar: bool = True,
show: bool = True,
save: bool = False,
ax: Axes | None = None,
close_when_done: bool | None = None,
minimum_plot_width: float | None = 800.0,
**kwargs: Any,
) -> Axes:
"""
Plot an image of the surface.
Parameters
----------
plot_style : str, optional
The style of the plot. Options are "map" and "hillshade". In "map" mode, the variable is displayed as a colored map. In "hillshade" mode, a hillshade image is generated using "face_elevation" data. If a different variable is passed to `variable`, then the hillshade will be overlayed with that variable's data. Default is "map".
variable_name : str | None, optional
The variable to plot. If None is provided then "face_elevation" is used in "map" mode.
interval: int | None, optional
The interval number of the surface to plot. If None, the currently loaded surface data will be used.
cmap : str, optional
The colormap to use for the plot. If None, a default colormap will be used ("cividis" by default and "grey" when plot_style=="hillshade" and variable=="face_elevation").
label : str | None, optional
A label for the plot. If None, no label will be added.
scalebar : bool, optional
If True, a scalebar will be added to the plot. Default is False for global surfaces.
colorbar : bool, optional
If True, a colorbar will be added to the plot when using "map" plot_style or "hillshade" with a variable overlay. Default is True.
show : bool, optional
If True, the plot will be displayed. Default for local surfaces is True.
save : bool, optional
If True, the plot will be saved to the default plot directory. Default is False.
ax : matplotlib.axes.Axes, optional
An existing Axes object to plot on. If None, a new figure and axes will be created.
close_when_done : bool, optional
If True, the figure will be closed after plotting. Default is True when save is True and show is False, and False otherwise.
minimum_plot_width : float, optional
Because the width of the plot is determined by the number of faces, small regions will generate small plots with labels that are hard to read. This parameter sets a lower limit to the width of the image that is generated by the plot. By default it is 800. Set to None to turn it off.
**kwargs : Any
|kwargs|
Returns
-------
matplotlib.axes.Axes
A Matplotlib Axes object containing the plot.
"""
if scalebar is None:
scalebar = False
return self._full().plot(
variable_name=variable_name,
plot_style=plot_style,
cmap=cmap,
interval=interval,
label=label,
scalebar=scalebar,
colorbar=colorbar,
show=show,
save=save,
ax=ax,
minimum_plot_width=minimum_plot_width,
**kwargs,
)
[docs]
def pyvista_plotter(
self,
variable_name: str | None = None,
variable: ArrayLike | None = None,
interval: int | None = None,
theme: str | None = None,
transparent_background: bool | None = None,
plotter: pv.Plotter | None = None,
enable_interactive: bool = True,
**kwargs: Any,
) -> pv.Plotter:
"""
Show the surface region using an interactive 3D plot with PyVista.
Parameters
----------
variable_name : str, optional
The name of the variable to plot. If the name of the variable is not already stored on the surface mesh, then the `variable` argument must also be passed. Default is None, which will plot a greyscale image of the surface.
variable: (n_face) array, optional
An array face values that will be used to color the surface mesh. This is required if `variable_name` is not stored on the mesh.
interval : int | None, optional
The interval number of the surface to show. If None, the currently loaded surface data will be displayed. Default is None.
theme : str, optional
The PyVista theme to use for the plot. If None, the default PyVista theme will be used.
transparent_background : bool, optional
If True, the background of the plot will be transparent. Default is False.
plotter : pv.Plotter, optional
An existing PyVista Plotter object to use for the plot. If None, a new Plotter object will be created. Default is None.
enable_interactive : bool, optional
If True, the key events for the plotter will be updated to include custom events for navigating between intervals. Default is True.
**kwargs : Any
|kwargs|
Returns
-------
pyvista.Plotter
The PyVista Plotter object for further customization.
"""
return self._full().pyvista_plotter(
variable_name=variable_name,
variable=variable,
interval=interval,
theme=theme,
transparent_background=transparent_background,
plotter=plotter,
enable_interactive=enable_interactive,
**kwargs,
)
[docs]
def compute_distances(
self,
locations: ArrayLike,
reference_location: PairOfFloats,
) -> NDArray[np.float64]:
"""
Calculate the great circle distance between one point and one or more other points in meters.
Parameters
----------
locations : FloatLike or ArrayLike
Array of (lon, lat) locations of the second point or array of points in degrees.
reference_location : PairOfFloats
(lon, lat) location of the reference point in degrees.
Returns
-------
NDArray
Great circle distance between the two points in meters.
Notes
-----
This is a wrapper for a compiled Rust function and is intended to be used as a helper to calculate_face_and_node_distances.
"""
return self._full().compute_distances(locations=locations, reference_location=reference_location)
[docs]
def compute_bearings(
self,
locations: ArrayLike,
reference_location: PairOfFloats,
) -> NDArray[np.float64]:
"""
Calculate the initial bearing relative to North from one point to one or more other points in degrees.
Parameters
----------
locations : ArrayLike
Longitude and latitude of the second point or array of points in degrees.
reference_location : PairOfFloats
Longitude and latitude of the first point in degrees.
Returns
-------
NDArray
Initial bearing from the first point to the second point or points in degrees.
"""
return self._full().compute_bearings(locations=locations, reference_location=reference_location)
[docs]
def compute_location_from_distance_bearing(
self,
distance: FloatLike | ArrayLike,
bearing: FloatLike | ArrayLike,
reference_location: PairOfFloats,
) -> NDArray[np.float64]:
"""
Calculate the longitude and latitude of one or more points given a reference point, initial bearings, and distances.
Parameters
----------
bearings : FloatLike or ArrayLike
Initial bearing from the reference point to the target point or points in degrees.
distances : FloatLike or ArrayLike
Great circle distance from the reference point to the target point or points in meters.
reference_location : PairOfFloats
Longitude and latitude of the reference point in degrees.
Returns
-------
NDArray
Longitude and latitude of the target point or points in degrees.
"""
return self._full().compute_location_from_distance_bearing(
distance=distance,
bearing=bearing,
reference_location=reference_location,
)
@staticmethod
def _compute_elevation_to_cartesian(position: NDArray, elevation: NDArray) -> NDArray:
"""
Convert elevation values to Cartesian coordinates.
Parameters
----------
position : NDArray
The position of the points in Cartesian coordinates.
elevation : NDArray
The elevation values to convert.
Returns
-------
NDArray
The Cartesian coordinates of the points with the given elevation.
"""
runit = position / np.linalg.norm(position, axis=1, keepdims=True)
return position + elevation[:, np.newaxis] * runit
[docs]
def read_saved_output(self, interval: int | None = None, reset: bool = False, **kwargs: Any) -> UxDataset:
"""
Load the grid and data files into a UxDataset object.
Parameters
----------
interval : int, optional
Interval number to read from the data files. Default is None (all saved intervals)
reset : bool, optional
Flag to indicate whether to reset the surface. If True it reads in the grid but creates an empty dataset. Default is False.
**kwargs : Any
|kwargs|
Returns
-------
UxDataset
An initialized UxDataset object containing the grid and data.
"""
return self._full().read_saved_output(interval=interval, reset=reset, **kwargs)
def _load_from_files(self, interval: int = -1, reset: bool = False, regrid: bool = False, **kwargs: Any) -> None:
"""
Load the grid and data files into the surface object.
This function loads the grid file and data files from the specified directory. If the grid file does not exist, it will attempt to create a new grid. If the data files do not exist, it will create an empty dataset. If reset is True, it will delete all data files except the grid file.
Parameters
----------
interval : int, optional
Interval number to read from the data files. Default is -1 (the last saved interval).
reset : bool, optional
Flag to indicate whether to reset the surface. Default is False.
regrid : bool, optional
Flag to indicate whether to regrid the surface. Default is False.
"""
# Get the names of all data files in the data directory that are not the grid file
regrid = self._regrid_if_needed(regrid=regrid, **kwargs)
reset = reset or regrid
self.is_new = reset
ask_overwrite = self.ask_overwrite # Store this in case of regridding. If regridding, we need it to be False for the reset
if regrid:
self.ask_overwrite = False
# Read in only the last saved data file
uxds = self.read_saved_output(interval=interval, reset=reset, **kwargs)
if "interval" in uxds.dims:
uxds = uxds.isel(interval=-1)
object.__setattr__(self, "_uxds", uxds)
if reset:
self.reset(**kwargs)
self.ask_overwrite = ask_overwrite # Restore in case we had to set it to False for the reset during regridding
return
def _write_grid_file(self, uxgrid: uxr.Grid | None = None, grid_file: Path | str | None = None, **kwargs: Any) -> None:
"""
Write the grid to a NetCDF file.
Parameters
----------
uxgrid : uxr.Grid, optional
The grid to be written to the file. If None, the grid will be obtained from the surface object. Default is None.
grid_file : Path, str, optional
The path to the grid file. If None, the path will be obtained from the surface object. Default is None.
**kwargs : Any
|kwargs|
"""
import uxarray.conventions.ugrid as ugrid
if uxgrid is None:
uxgrid = self.uxgrid
if grid_file is None:
grid_file = self.grid_file
else:
grid_file = Path(grid_file)
grid_file.unlink(missing_ok=True)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
ds = uxgrid.to_xarray()
# There is some issue that is causing a mismatch between the data variables and the grid_topology attributes that is causing problems when reading the file back in under some circumstances, such as deleting the grid file and reinitializing the surface. To get around this, we will check that all of the attributes are correct before writing the file.
ugrid_names = ugrid.CONNECTIVITY_NAMES + ugrid.DIM_NAMES
for name in ugrid_names:
if name in ds.grid_topology.attrs and name not in ds:
del ds.grid_topology.attrs[name]
if "edge_dimension" in ds.grid_topology.attrs and "n_edge" not in ds:
del ds.grid_topology.attrs["edge_dimension"]
if "face_coordinates" in ds.grid_topology.attrs:
face_coords = ds.grid_topology.attrs["face_coordinates"].split()
for coord in face_coords:
if coord not in ds:
del ds.grid_topology.attrs["face_coordinates"]
break
ds.to_netcdf(temp_file.name)
temp_file.flush()
os.fsync(temp_file.fileno())
# Replace the original file only if writing succeeded
shutil.move(temp_file.name, grid_file)
return
def _generate_grid(self, **kwargs: Any) -> None:
"""
Generate a tessellated mesh of a sphere of based on the particular Surface component that is being used.
"""
# Ignore divide by zero warnings that occur due to matmul on some systems (e.g. MacOS with M4 chips). These warnings do not affect the functionality of the code, so we can safely ignore them.
orig_settings = np.seterr(divide="ignore", over="ignore", invalid="ignore")
points = self._generate_face_distribution(**kwargs)
threshold = min(10 ** np.floor(np.log10(self.pix / self.radius)), 1e-7)
uxgrid = uxr.Grid.from_points(points, method="spherical_voronoi", threshold=threshold)
uxgrid.attrs["_id"] = self._id
self._write_grid_file(uxgrid=uxgrid)
regrid = not self._is_same_grid
if regrid:
raise ValueError("Grid file does not match the expected parameters.")
self._compute_face_size(uxgrid)
np.seterr(**orig_settings)
return
@property
def _is_same_grid(self):
"""Check if the existing grid matches the one defined by the current parameters and returns True if they match after regridding."""
try:
with xr.open_dataset(self.grid_file) as ds:
ds.load()
uxgrid = uxr.Grid.from_dataset(ds)
old_id = uxgrid.attrs.get("_id")
return old_id == self._id
except Exception:
# Failed to open an old file for whatever reason, so we'll need to regrid
print("Generating a new grid.")
return False
def _regrid_if_needed(self, regrid: bool = False, **kwargs: Any) -> bool:
"""
Check if the existing grid matches the desired parameters determine if regridding is necessary.
This function checks if a grid file exists and matches the specified parameters based on a unique hash generated from these
parameters. If the grid does not exist or does not match the parameters it generates a new grid and returns True.
Parameters
----------
regrid: bool, optional
Flag to force regridding even if the grid file exists. Default is False.
Returns
-------
bool
A boolean indicating whether grid was regenerated.
"""
# Find out if the file exists, if it does't we'll need to make a new grid
self.output_dir.mkdir(parents=True, exist_ok=True)
regrid = regrid or not Path(self.grid_file).exists()
if not regrid:
regrid = not self._is_same_grid
if regrid:
print("Creating a new grid")
self._generate_grid(**kwargs)
return regrid
def _full(self) -> LocalSurface:
return LocalSurface(
self, face_indices=slice(None), node_indices=slice(None), edge_indices=slice(None), **vars(self.common_args)
)
def _save_data(
self, ds: xr.Dataset | xr.DataArray, interval: int = 0, filename: Path | None = None, reset: bool = False, **kwargs: Any
) -> None:
"""
Save the data to the specified directory.
Parameters
----------
ds : xr.Dataset or xr.DataArray
The data to be saved.
interval : int, Default is 0.
Interval number to append to the data file name. Default is 0.
filename : Path | None = None
The name of the data file. If None, the name will be generated from the variable name and interval number.
**kwargs : Any
|kwargs|
Notes
-----
This function first saves to a temporary file and then moves that file to the final destination. This is done to avoid file
locking issues with NetCDF files.
"""
if isinstance(ds, xr.DataArray):
ds = ds.to_dataset()
if "interval" not in ds.dims:
ds = ds.expand_dims(["interval"])
if "interval" not in ds.coords:
ds = ds.assign_coords({"interval": [interval]})
ds.load()
if filename is None:
filename = self.output_filename(interval)
else:
filename = Path(filename)
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, **kwargs) as temp_dir:
data_file = self.output_dir / filename
if data_file.exists():
with xr.open_mfdataset(data_file) as ds_old:
ds_old = ds_old.load()
ds_file = ds.merge(ds_old, compat="override")
else:
ds_file = ds
temp_file = Path(temp_dir) / filename
comp = {"zlib": True, "complevel": 9}
encoding = dict.fromkeys(ds_file.data_vars, comp)
ds_file.to_netcdf(temp_file, encoding=encoding, **kwargs)
ds_file.close()
ds.close()
shutil.move(temp_file, data_file)
return
def _add_new_data(
self,
name: str,
long_name: str | None = None,
units: str | None = None,
data: FloatLike | NDArray | None = None,
isfacedata: bool = True,
save_to_file: bool = False,
interval: int = 0,
dtype=np.float64,
**kwargs: Any,
) -> None:
"""
Generate either a node or face data variable and optionally save it to a file. If the data variable already exists, it will be overwritten.
Parameters
----------
name : str
Name of the data variable. This will also be used as the data file name.
long_name : str, optional
Long name of the data variable that will be saved as an attribute.
units : str, optional
Units of the data variable that will be saved as an attribute.
data : scalar or array-like
Data file to be saved. If data is a scalar, then the data file will be filled with that value. If data is an array, then the data file will be filled with the array values. The data array must have the same size as the number of faces or nodes in the grid.
isfacedata : bool, optional
Flag to indicate whether the data is face data or node data. Default is True.
save_to_file: bool, optional
Specify whether the data should be saved to a file. Default is False.
interval : int, optional, default 0
The interval number to use when saving the data to the data file.
dtype : data-type, optional
The data type of the data variable. Default is np.float64.
**kwargs : Any
|kwargs|
"""
if self.uxgrid is None:
with xr.open_dataset(self.grid_file, **kwargs) as ds:
ds.load()
uxgrid = uxr.Grid.from_dataset(ds)
else:
uxgrid = self.uxgrid
if long_name is None and units is None:
attrs = None
else:
attrs = {}
if long_name:
attrs["long_name"] = long_name
if units:
attrs["units"] = units
if isfacedata:
dims = "n_face"
size = uxgrid.n_face
else:
dims = "n_node"
size = uxgrid.n_node
if data is None:
data = np.zeros(size, dtype=dtype)
elif np.isscalar(data):
data = np.full(size, data, dtype=dtype)
else:
if data.size != size:
raise ValueError("data must have the same size as the number of faces or nodes in the grid")
uxda = UxDataArray(data=data, dims=dims, attrs=attrs, name=name, uxgrid=uxgrid, **kwargs)
self._uxds[name] = uxda
if save_to_file:
self._save_data(uxda, interval=interval, **kwargs)
return
@abstractmethod
def _generate_face_distribution(self, **kwargs: Any) -> NDArray: ...
def _compute_face_size(self, uxgrid: UxDataset | None = None) -> None:
"""
Compute the effective pixel size of the mesh based on the face areas.
Parameters
----------
uxgrid : UxDataset
The grid object containing the mesh information. If not set, it will be retrieved self.uxds.uxgrid
"""
if uxgrid is None:
if self.uxgrid is None:
return
else:
uxgrid = self.uxgrid
self._face_area = uxgrid.face_areas.values * self.radius**2
self._face_size = np.sqrt(self._face_area)
self._pix_mean = float(self._face_size.mean().item())
self._pix_std = float(self._face_size.std().item())
self._pix_min = float(self._face_size.min().item())
self._pix_max = float(self._face_size.max().item())
return
[docs]
def get_location_extents(self, location: PairOfFloats, radius: FloatLike) -> tuple[float, float, float, float]:
"""
Computes the longitude and latitude extents of a given region.
Parameters
----------
location : PairOfFloats
The center of the region.
radius : FloatLike
The radius of the region.
Returns
-------
lon_min : float
Minimum longitude in degrees.
lon_max : float
Maximum longitude in degrees.
lat_min : float
Minimum latitude in degrees.
lat_max : float
Maximum latitude in degrees.
"""
R = self.target.radius
lon0, lat0 = location
alpha_deg = np.degrees(radius / R)
lat_min = max(-90.0, lat0 - alpha_deg)
lat_max = min(90.0, lat0 + alpha_deg)
# If we hit a pole, longitudes span everything
if abs(lat0) + alpha_deg >= 90.0:
lon_min, lon_max = -180.0, 180.0
else:
half_lon_span = alpha_deg / np.cos(np.radians(lat0))
half_lon_span = min(half_lon_span, 180.0)
lon_min = lon0 - half_lon_span
lon_max = lon0 + half_lon_span
return float(lon_min), float(lon_max), float(lat_min), float(lat_max)
@property
def _hashvars(self):
"""
The variables used to generate the hash.
"""
return [self.component_name, self.target.name, self.radius]
@property
def _id(self):
"""
The hash id of the grid. This is used for determining if the grid needs to be regridded.
"""
combined = ":".join(str(v) for v in self._hashvars)
combined += "v2026.2.0" # Add version number because of API changes
hash_object = hashlib.sha256(combined.encode())
return hash_object.hexdigest()
@property
def uxds(self) -> UxDataset:
"""The data associated with the surface as an instance of UxDataset."""
return self._uxds
@property
def uxgrid(self):
"""The grid object as an instance of a UxArray Grid."""
if self.uxds is not None:
return self.uxds.uxgrid
@property
def gridtype(self):
"""The name of the grid type."""
return self._component_name
@property
def grid_file(self):
"""Path to the grid file."""
return self.output_dir / f"{self._grid_file_prefix}.{self._output_file_extension}"
@property
def target(self):
"""The Target object associated with this Surface object."""
return self._target
@target.setter
def target(self, value):
from cratermaker.components.target import Target
self._target = Target.maker(value)
return
@property
def pix(self) -> float:
"""The effective pixel size of the mesh in meters."""
if self._pix is None:
self._pix = self.pix_mean
return self._pix
@property
def pix_mean(self) -> float:
"""The mean pixel size of the mesh in meters."""
if self._pix_mean is None and self.uxgrid is not None:
self._compute_face_size()
return self._pix_mean
@property
def pix_std(self) -> float:
"""The standard deviation of the pixel size of the mesh in meters."""
if self._pix_std is None and self.uxgrid is not None:
self._compute_face_size()
return self._pix_std
@property
def pix_min(self) -> float:
"""The minimum pixel size of the mesh in meters."""
if self._pix_min is None and self.uxgrid is not None:
self._compute_face_size()
return self._pix_min
@property
def pix_max(self) -> float:
"""The maximum pixel size of the mesh in meters."""
if self._pix_max is None and self.uxgrid is not None:
self._compute_face_size()
return self._pix_max
@property
def radius(self) -> float:
"""Radius of the target body in meters."""
return self.target.radius
@property
def area(self) -> float:
"""Total surface area of the target body in m²."""
if self._area is None:
self._area = float(self.face_area.sum())
return self._area
@property
def face_area(self) -> NDArray[np.float64]:
"""An array of areas of each individual face in m²."""
if self._face_area is None:
self._compute_face_size()
return self._face_area
@property
def face_size(self) -> NDArray[np.float64]:
"""The effective size of each face in meters, which s simply the square root of the face area, but is useful for certain comparisons and is equivalent to the `pix` variable from CTEM."""
if self._face_size is None:
self._compute_face_size()
return self._face_size
@property
def smallest_length(self) -> float:
"""The smallest length of the mesh in meters."""
if self._smallest_length is None:
self._smallest_length = float(np.min(self.face_size) * _SMALLFAC)
return self._smallest_length
@property
def face_lat(self) -> NDArray[np.float64]:
"""Latitude of the center of each face in degrees."""
return self.uxgrid.face_lat.values
@property
def face_lon(self) -> NDArray[np.float64]:
"""Longitude of the center of each face in degrees."""
return self.uxgrid.face_lon.values
@property
def face_x(self) -> NDArray[np.float64]:
"""Cartesian x location of the center of each face in meters."""
if self._face_x is None:
self._face_x = self.uxgrid.face_x.values * self.radius
return self._face_x
@property
def face_y(self) -> NDArray[np.float64]:
"""Cartesian y location of the center of each face in meters."""
if self._face_y is None:
self._face_y = self.uxgrid.face_y.values * self.radius
return self._face_y
@property
def face_z(self) -> NDArray[np.float64]:
"""Cartesian z location of the center of each face in meters."""
if self._face_z is None:
self._face_z = self.uxgrid.face_z.values * self.radius
return self._face_z
@property
def node_lat(self) -> NDArray[np.float64]:
"""Latitude of each node in degrees."""
return self.uxgrid.node_lat.values
@property
def node_lon(self) -> NDArray[np.float64]:
"""Longitude of each node in degrees."""
return self.uxgrid.node_lon.values
@property
def node_x(self) -> NDArray[np.float64]:
"""Cartesian x location of each node in meters."""
if self._node_x is None:
self._node_x = self.uxgrid.node_x.values * self.radius
return self._node_x
@property
def node_y(self) -> NDArray[np.float64]:
"""Cartesian y location of each node in meters."""
if self._node_y is None:
self._node_y = self.uxgrid.node_y.values * self.radius
return self._node_y
@property
def node_z(self) -> NDArray[np.float64]:
"""Cartesian z location of each node in meters."""
if self._node_z is None:
self._node_z = self.uxgrid.node_z.values * self.radius
return self._node_z
@property
def face_indices(self) -> NDArray[np.int64]:
"""The indices of the faces of the surface."""
return self.uxds.n_face.values
@property
def node_indices(self) -> NDArray[np.int64]:
"""The indices of the nodes of the surface."""
return self.uxds.n_node.values
@property
def edge_indices(self) -> NDArray[np.int64]:
"""The indices of the edges of the surface."""
if self._edge_indices is None:
self._edge_indices = np.arange(self.uxgrid.n_edge, dtype=np.int64)
return self._edge_indices
def _compute_face_bins(self) -> None:
"""
Compute the face bins based on the face areas. This is used to bin faces by their area for crater generation.
"""
min_area = self.face_area.min()
max_area = self.face_area.max()
max_bin_index = np.ceil(np.log2(max_area / min_area)).astype(int)
bins = [[] for _ in range(max_bin_index)]
for face_index, area in enumerate(self.face_area):
bin_index = np.floor(np.log2(area / min_area)).astype(int)
bins[bin_index].append(face_index)
self._face_bin_indices = [np.array(bins[i]) for i in range(max_bin_index) if len(bins[i]) > 0]
self._face_bin_area = [np.sum(self.face_area[face_indices]) for face_indices in self.face_bin_indices]
self._face_bin_argmin = [
int(face_indices[np.argmin(self.face_area[face_indices])]) for face_indices in self._face_bin_indices
]
self._face_bin_argmax = [
int(face_indices[np.argmax(self.face_area[face_indices])]) for face_indices in self._face_bin_indices
]
return
@property
def face_bin_indices(self) -> list[NDArray]:
"""
Faces binned based on their area.
All faces within a factor of 2 in area are in the same bin. This property returns a list of face indices lists for each bin.
The keys are the bin indices, and the values are lists of face indices of faces within that bin.
This is used when generating craters on surfaces with varying face sizes, so that the smallest crater is sized for the smallest face of a particular bin, rather than for the entire surface.
"""
if self._face_bin_indices is None:
self._compute_face_bins()
return self._face_bin_indices
@property
def face_bin_area(self) -> list[float]:
"""The total area of all faces in each bin in m²."""
if self._face_bin_area is None:
self._compute_face_bins()
return self._face_bin_area
@property
def face_bin_argmin(self) -> list[int]:
"""The index of the smallest face in each bin."""
if self._face_bin_argmin is None:
self._compute_face_bins()
return self._face_bin_argmin
@property
def face_bin_argmax(self) -> list[int]:
"""The index of the largest face in each bin."""
if self._face_bin_argmax is None:
self._compute_face_bins()
return self._face_bin_argmax
@property
def face_bin_min_areas(self) -> list[float]:
"""The area of the smallest face in each bin in m²."""
if self._face_bin_argmin is None:
self._compute_face_bins()
return [float(self.face_area[face_index]) for face_index in self.face_bin_argmin]
@property
def face_bin_max_areas(self) -> list[float]:
"""The area of the largest face in each bin in m²."""
if self._face_bin_argmax is None:
self._compute_face_bins()
return [float(self.face_area[face_index]) for face_index in self.face_bin_argmax]
@property
def face_bin_min_sizes(self) -> list[float]:
"""The effective size of the smallest face in each bin in meters."""
if self._face_bin_argmin is None:
self._compute_face_bins()
return [float(self.face_size[face_index]) for face_index in self.face_bin_argmin]
@property
def face_bin_max_sizes(self) -> list[float]:
"""The effective size of the largest face in each bin in meters."""
if self._face_bin_argmax is None:
self._compute_face_bins()
return [float(self.face_size[face_index]) for face_index in self.face_bin_argmax]
@property
def n_face(self) -> int:
"""Total number of faces."""
return int(self.uxgrid.n_face)
@property
def n_node(self) -> int:
"""Total number of nodes."""
return int(self.uxgrid.n_node)
@property
def n_edge(self) -> int:
"""Total number of edges."""
return int(self.uxgrid.n_edge)
@property
def n_nodes_per_face(self) -> NDArray[np.int64]:
"""
The number of nodes that make up each face.
Dimensions: `(n_node, )`
"""
return self.uxgrid.n_nodes_per_face.values
@property
def n_max_face_faces(self) -> int:
"""The maximum number of faces that surround a face."""
return int(self.uxgrid.n_max_face_faces)
@property
def edge_face_distance(self) -> NDArray[np.float64]:
"""Distances between the centers of the faces that saddle each edge in meters."""
if self._edge_face_distance is None:
self._edge_face_distance = surface_bindings.compute_edge_distances(
edge_connectivity=self.edge_face_connectivity,
lon=np.radians(self.face_lon),
lat=np.radians(self.face_lat),
radius=self.radius,
)
return self._edge_face_distance
@property
def edge_length(self) -> NDArray[np.float64]:
"""
Lengths of each edge in meters.
Notes
-----
This is computed as the distance between the nodes that define the edge
"""
if self._edge_lengths is None:
self._edge_lengths = surface_bindings.compute_edge_distances(
edge_connectivity=self.edge_node_connectivity,
lon=np.radians(self.node_lon),
lat=np.radians(self.node_lat),
radius=self.radius,
)
return self._edge_lengths
@property
def face_node_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the nodes that make up each face.
Dimensions: `(n_face, n_max_face_nodes)`
Nodes are in counter-clockwise order.
"""
return self.uxgrid.face_node_connectivity.values
@property
def face_face_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the faces that surround each face.
Dimensions: `(n_face, n_max_face_faces)`
"""
return self.uxgrid.face_face_connectivity.values
@property
def face_edge_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the edges that surround each face.
Dimensions (n_face, n_max_face_edges)
"""
return self.uxgrid.face_edge_connectivity.values
@property
def node_face_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the faces that surround each node.
Dimensions: `(n_node, n_max_node_faces)`
"""
return self.uxgrid.node_face_connectivity.values
@property
def edge_face_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the faces that saddle each edge.
Dimensions (n_edge, 2)
"""
return self.uxgrid.edge_face_connectivity.values
@property
def edge_node_connectivity(self) -> NDArray[np.int64]:
"""
Indices of the nodes that define each edge.
Dimensions (n_edge, 2)
"""
return self.uxgrid.edge_node_connectivity.values
@property
def face_tree(self):
"""The BallTree for the face centers, which can be used for efficient spatial queries."""
if self._face_tree is None:
self._face_tree = self.uxgrid.get_ball_tree(
"face centers",
distance_metric="haversine",
coordinate_system="spherical",
reconstruct=True,
)
return self._face_tree
@property
def node_tree(self):
"""The BallTree for the nodes, which can be used for efficient spatial queries."""
if self._node_tree is None:
self._node_tree = self.uxgrid.get_ball_tree(
"nodes",
distance_metric="haversine",
coordinate_system="spherical",
reconstruct=True,
)
return self._node_tree
@property
def edge_tree(self):
"""The BallTree for the edge centers, which can be used for efficient spatial queries."""
if self._edge_tree is None:
self._edge_tree = self.uxgrid.get_ball_tree(
"edge centers",
distance_metric="haversine",
coordinate_system="spherical",
reconstruct=True,
)
return self._edge_tree
[docs]
@staticmethod
def get_crs(radius: float, name: str, location: tuple[float, float] | None = None) -> CRS:
"""
Returns either a CRS for a global sphere or a local azimuthal equidistant projection centered at the given location if one is provided.
Parameters
----------
radius : float
The radius of the sphere in meters.
name : str
The name of the target body.
location : tuple[float, float] | None, optional
The location of the center of the LAEA projection in degrees. If None, a global CRS is returned. Default is None.
"""
if location is None:
wkt = (
f'GEOGCS["GCS_{name}_global",'
f' DATUM["D_{name}",'
f' ELLIPSOID["{name}",{radius:.6f},0,'
f' LENGTHUNIT["metre",1]]],'
f' PRIMEM["Reference_Meridian",0,'
f' ANGLEUNIT["degree",0.0174532925199433]]],'
f" CS[ellipsoidal,2],"
f' AXIS["geodetic latitude (Lat)",north,'
f" ORDER[1],"
f' ANGLEUNIT["degree",0.0174532925199433]],'
f' AXIS["geodetic longitude (Lon)",east,'
f" ORDER[2],"
f' ANGLEUNIT["degree",0.0174532925199433]],'
f'REMARK["Created by Cratermaker"]]'
)
else:
lon0, lat0 = location
wkt = (
f'PROJCRS["Proj_{name}_region_location_{lon0}_{lat0}_Lambert_Azimuthal_Equal_Area",'
f' BASEGEOGCRS["GCS_{name}_global",'
f' DATUM["D_{name}",'
f' ELLIPSOID["{name}",{radius:.6f},0,'
f' LENGTHUNIT["metre",1]]],'
f' PRIMEM["Reference_Meridian",0,'
f' ANGLEUNIT["degree",0.0174532925199433]]],'
f' CONVERSION["Lambert Azimuthal Equal Area",'
f' METHOD["Lambert Azimuthal Equal Area (Spherical)",'
f' ID["EPSG",1027]],'
f' PARAMETER["Latitude of natural origin",{lat0},'
f' ANGLEUNIT["degree",0.0174532925199433],'
f' ID["EPSG",8801]],'
f' PARAMETER["Longitude of natural origin",{lon0},'
f' ANGLEUNIT["degree",0.0174532925199433],'
f' ID["EPSG",8802]],'
f' PARAMETER["False easting",0,'
f' LENGTHUNIT["metre",1],'
f' ID["EPSG",8806]],'
f' PARAMETER["False northing",0,'
f' LENGTHUNIT["metre",1],'
f' ID["EPSG",8807]]],'
f" CS[Cartesian,2],"
f' AXIS["(E)",east,'
f" ORDER[1],"
f' LENGTHUNIT["metre",1]],'
f' AXIS["(N)",north,'
f" ORDER[2],"
f' LENGTHUNIT["metre",1]]],'
f'REMARK["Created by Cratermaker"]'
)
return CRS.from_wkt(wkt)
@property
def crs(self) -> CRS:
"""Return a geographic CRS (lon/lat in degrees) on a sphere using the target radius from the surface. Axis order is Lon/East, Lat/North."""
if self._crs is None:
self._crs = self.get_crs(radius=self.radius, name=self.target.name)
return self._crs
@property
def is_new(self) -> bool:
"""Whether this surface is newly created or has been loaded from disk and not reset."""
return self._is_new
@is_new.setter
def is_new(self, value: bool) -> None:
if not isinstance(value, bool):
raise TypeError("is_new must be a boolean value.")
self._is_new = value
return
[docs]
def data_composer(self) -> DataComposer:
"""
Creates a ``DataComposer`` tied to this ``Surface``. This should usually be called in a ``with`` statement.
Returns
-------
DataComposer
"""
return self._full().data_composer()
@property
def face_variables(self) -> list[str]:
"""
Returns a list of the data variables currently being stored on the faces.
"""
return self._full().face_variables
@property
def node_variables(self) -> list[str]:
"""
Returns a list of the data variables currently being stored on the nodes.
"""
return self._full().node_variables
[docs]
class LocalSurface(CratermakerBase):
"""
Generates a regional view of a subset of the surface mesh without making copies of any of the data.
Parameters
----------
surface : Surface
The surface object that contains the mesh data.
face_indices : NDArray | slice
The indices of the faces to include in the view.
node_indices : NDArray | slice | None, optional
The indices of the nodes to include in the view. If None, all nodes connected to the faces will be extracted when required
edge_indices : NDArray | slice | None, optional
The indices of the edges to include in the view. If None, all edges connected to the faces will be extracted when required.
location : tuple[float, float] | None, optional
The location of the center of the view in degrees. This is intended to be passed via the extract_region method of Surface.
region_radius : FloatLike | None, optional
The radius of the region to include in the view in meters. This is intended to be passed via the extract_region method of Surface.
reset : bool, optional
Flag to indicate whether to remove any existing data files in the output directory. Default is True.
**kwargs : Any
|kwargs|
"""
def __init__(
self,
surface: Surface,
face_indices: NDArray | slice,
node_indices: NDArray | slice | None = None,
edge_indices: NDArray | slice | None = None,
location: tuple[float, float] | None = None,
region_radius: FloatLike | None = None,
**kwargs: Any,
):
object.__setattr__(self, "_surface", Surface.maker(surface))
super().__init__(**{**kwargs, "simdir": self.surface.simdir})
object.__setattr__(self, "_uxds", None)
object.__setattr__(self, "_uxgrid", None)
object.__setattr__(self, "_face_indices", face_indices)
object.__setattr__(self, "_node_indices", node_indices)
object.__setattr__(self, "_edge_indices", edge_indices)
object.__setattr__(self, "_region_radius", region_radius)
object.__setattr__(self, "_location", None)
object.__setattr__(self, "_area", None)
object.__setattr__(self, "_n_edge", None)
object.__setattr__(self, "_n_face", None)
object.__setattr__(self, "_n_node", None)
object.__setattr__(self, "_face_distance", None)
object.__setattr__(self, "_node_distance", None)
object.__setattr__(self, "_face_bearing", None)
object.__setattr__(self, "_node_bearing", None)
object.__setattr__(self, "_edge_face_connectivity", None)
object.__setattr__(self, "_edge_node_connectivity", None)
object.__setattr__(self, "_face_edge_connectivity", None)
object.__setattr__(self, "_face_node_connectivity", None)
object.__setattr__(self, "_face_face_connectivity", None)
object.__setattr__(self, "_node_face_connectivity", None)
object.__setattr__(self, "_crs", None)
object.__setattr__(self, "_output_file_extension", "nc")
object.__setattr__(self, "_from_surface", None)
object.__setattr__(self, "_to_surface", None)
object.__setattr__(self, "_face_proj_x", None)
object.__setattr__(self, "_face_proj_y", None)
object.__setattr__(self, "_node_proj_x", None)
object.__setattr__(self, "_node_proj_y", None)
object.__setattr__(self, "_desloped_face_elevation", None)
self._output_dir_name = self.surface._output_dir_name
if location is not None: # This is a true LocalSurface object
self._location = validate_and_normalize_location(location)
self._output_file_prefix = "local_surface"
object.__setattr__(self, "_output_file_prefix", "local_surface")
object.__setattr__(self, "_grid_file_prefix", "local_grid")
self._output_file_pattern = [
f"{self._output_file_prefix}*.{self._output_file_extension}",
f"{self._grid_file_prefix}.{self._output_file_extension}",
]
else: # This is really a Surface object wearing a LocalSurface costume.
object.__setattr__(self, "_output_file_prefix", self.surface._output_file_prefix)
object.__setattr__(self, "_grid_file_prefix", self.surface._grid_file_prefix)
object.__setattr__(self, "_region_radius", self.surface.radius)
self._output_file_pattern = self.surface._output_file_pattern
return
def __str__(self) -> str:
"""
String representation of the LocalSurface object.
"""
str_repr = "<LocalSurface>\n"
if self.is_local:
str_repr += f"Location: {self.location[0]:.2f}°, {self.location[1]:.2f}°\n"
if self.region_radius:
str_repr += f"Region Radius: {format_large_units(self.region_radius, quantity='length')}\n"
str_repr += f"Number of faces: {self.n_face}\n"
str_repr += f"Number of nodes: {self.n_node}\n"
return str_repr
def __getattr__(self, name: str):
"""
Get attribute from the surface's uxds.
If the attribute is face-, node-, or edge-based, slice it to only include the faces, nodes, or edges in the local surface.
Parameters
----------
name : str
Name of the attribute to get.
"""
uxds = self.surface.uxds
if name in uxds:
arr = uxds[name].values
if arr.ndim >= 1:
if arr.shape[0] == self.surface.n_face:
return arr[self.face_indices, ...]
elif arr.shape[0] == self.surface.n_node:
return arr[self.node_indices, ...]
elif arr.shape[0] == self.surface.n_edge:
return arr[self.edge_indices, ...]
else:
raise AttributeError(
f"Cannot match attribute {name!r} with shape {arr.shape} to face, node, or edge data of the surface"
)
return arr
raise AttributeError(f"{type(self).__name__!s} has no attribute {name!r}")
[docs]
def add_data(
self,
name: str,
data: FloatLike | NDArray,
long_name: str | None = None,
units: str | None = None,
isfacedata: bool = True,
overwrite: bool = False,
fill_value: float = 0.0,
dtype=np.float64,
positive_only: bool = False,
**kwargs,
) -> None:
"""
Adds new data to the surface.
Parameters
----------
name : str
Name of the data variable. This will also be used as the data file name.
data : scalar or array-like
Data file to be saved. If data is a scalar, then the data file will be filled with that value. If data is an array, then the data file will be filled with the array values. The data array must have the same size as the number of faces or nodes in the grid.
long_name : str, optional
Long name of the data variable that will be saved as an attribute if this is new data. If the data already exists on the surface, this will be ignored.
units : str, optional
Units of the data variable that will be saved as an attribute if this is new data. If the data already exists on the surface, this will be ignored.
isfacedata : bool, optional, default True
Flag to indicate whether the data is face data or node data. This is only needed if `data` is a scalar, otherwise it is ignored
overwrite : bool, optional, default False
By default, new data is added to the old data. This flag indicates that the data should be overwritten, replacing any old data with the new data.
fill_value : float, optional
The fill value to use for new data variables. Default is 0.0.
dtype : data-type, optional
The data type of the data variable. Default is np.float64.
positive_only: bool, optional
If True, only allow positive values on the data (data will be clipped at 0.0). Default is False.
**kwargs
|kwargs|
"""
# Check if the data is a scalar or an array
if np.isscalar(data):
n = self.n_face if isfacedata else self.n_node
data = np.full(n, data, dtype=dtype)
elif isinstance(data, list):
data = np.array(data, dtype=dtype)
else:
data = np.asarray(data, dtype=dtype)
if data.size == self.n_face:
isfacedata = True
indices = self.face_indices
elif data.size == self.n_node:
isfacedata = False
indices = self.node_indices
else:
raise ValueError("data must be a scalar or an array with the same size as the number of faces or nodes in the grid")
if name not in self.surface.uxds.data_vars:
overwrite = True
self.surface._add_new_data(
name, data=fill_value, long_name=long_name, units=units, isfacedata=isfacedata, dtype=dtype, **kwargs
)
if overwrite:
self.surface.uxds[name].data[indices] = data
else:
self.surface.uxds[name].data[indices] += data
if positive_only:
self.surface.uxds[name].data[indices] = np.maximum(0.0, self.surface.uxds[name].data[indices])
return
[docs]
def update_elevation(
self,
new_elevation: ArrayLike | FloatLike,
overwrite: bool = False,
**kwargs: Any,
) -> None:
"""
Update the elevation data for the target's surface mesh. This method will determine whether to update the node or face data (or both) depending on the size of the input data. If a scalar is passed, both node and face elevation will be updated to that value. By default, the elevation data will be added to the existing data. If `overwrite` is set to True, the existing data will be replaced with the new data.
Parameters
----------
new_elevation : ArrayLike | FloatLike
Elevation to be added (or replaced, if overwrite is True). This can be a scalar, an array with the same size as the number of faces, an array with the same size as the number of nodes, or an array with the same size as the number of faces + nodes.
overwrite : bool, optional
If True, the existing data will be replaced with the new data. Default is False.
**kwargs : Any
|kwargs|
Notes
-----
When passing combined data, the first part of the array will be used for face elevation and the second part for node elevation.
"""
def raise_invalid_elevation_error():
raise ValueError(
"new_elev must be None, a scalar, or an array with the same size as the number of nodes, faces, or nodes+faces"
)
try:
new_elevation = np.asarray(new_elevation)
if np.asarray(new_elevation).size == 1:
update_node = True
update_face = True
new_face_elev = np.full(self.n_face, new_elevation.item())
new_node_elev = np.full(self.n_node, new_elevation.item())
else:
if new_elevation.size == self.n_face:
update_face = True
update_node = False
new_face_elev = new_elevation
elif new_elevation.size == self.n_node:
update_face = False
update_node = True
new_node_elev = new_elevation
elif new_elevation.size == self.n_face + self.n_node:
update_face = True
update_node = True
new_face_elev = new_elevation[: self.n_face]
new_node_elev = new_elevation[self.n_face :]
else:
raise_invalid_elevation_error()
except Exception as e:
raise ValueError(
"new_elev must be None, a scalar, or an array with the same number of elements as either the faces or nodes of the surface mesh."
) from e
if update_face:
self.add_data(name="face_elevation", data=new_face_elev, overwrite=overwrite)
if not update_node:
self.interpolate_node_elevation_from_faces()
if update_node:
self.add_data(name="node_elevation", data=new_node_elev, overwrite=overwrite)
return
[docs]
def add_tag(
self,
name: str,
tag: int | None = None,
long_name: str | None = None,
**kwargs: Any,
) -> None:
"""
Adds an integer tag to the surface.
Used primarily for tracking crater ids.
Parameters
----------
name : str
The name of the tag to be added.
tag : int | None
The integer to apply the tag. If None is provided, the tag layers will be reset to zero
long_name : str | None
The long name of the tag to be added. If None, no long name will be added.
**kwargs : Any
|kwargs|
"""
# Reset the tag layers if the tag is None or does not yet exist on the surface
if name not in self.uxds or tag is None:
dims = ("n_face", "layer")
data = np.zeros((self.surface.n_face, _N_TAG_LAYERS), dtype=np.uint32)
if long_name is not None:
attrs = {"long_name": long_name}
else:
attrs = None
uxda = uxr.UxDataArray(
data=data,
dims=dims,
name=name,
attrs=attrs,
uxgrid=self.surface.uxgrid,
)
self.surface._uxds[name] = uxda
if tag is None:
return
insert_layer = -1
# Find the first layer that contains a contiguous set of empty spots
for attempt in range(2):
for i in range(_N_TAG_LAYERS):
if np.all(self.uxds[name].data[:, i] == 0):
insert_layer = i
break
if insert_layer == -1: # If at first we don't succed, try sorting the tags and try again.
if attempt == 0:
self.sort_tags(name)
break
else:
# TODO: Implement an option to expand the number of layers if all layers are full
warnings.warn(f"All {name} layers are full. Overwriting the first layer.", stacklevel=2)
insert_layer = 0
data = self.uxds[name].data
data[:, insert_layer] = tag
self.surface.uxds[name].data[self.face_indices, :] = data
self._desloped_face_elevation = None # Invalidate the desloped elevation cache since tags are often used for tracking craters, and we want to be able to get the original elevation of the crater faces if we need to.
return
[docs]
def remove_tag(self, name: str, tag: int) -> None:
"""
Removes an integer tag from the local surface.
Parameters
----------
tag : int
The integer tag to be removed.
"""
if name not in self.uxds:
raise ValueError(f"Tag {name} does not exist on the surface")
data = self.uxds[name].data
if tag in data:
data[data == tag] = 0
self.surface.uxds[name].data[self.face_indices, :] = data
return
[docs]
def apply_diffusion(self, kdiff: FloatLike | NDArray) -> None:
"""
Apply diffusion to the surface.
Parameters
----------
kdiff : float or array-like
The degradation state of the surface, which is the product of diffusivity and time. It can be a scalar or an array of the same size as the number of faces in the grid.
If it is a scalar, the same value is applied to all faces. If it is an array, it must have the same size as the number of faces in the grid.
The value of kdiff must be greater than 0.0.
"""
if np.isscalar(kdiff):
kdiff = np.full(self.n_face, kdiff)
elif kdiff.size != self.n_face:
raise ValueError("kdiff must be a scalar or an array with the same size as the number of faces in the grid")
if np.any(kdiff < 0.0):
raise ValueError("kdiff must be greater than 0.0")
kdiff = np.asarray(kdiff, dtype=np.float64)
kdiffmax = np.max(kdiff)
if abs(kdiffmax) < _VSMALL:
return
delta_face_elevation = surface_bindings.apply_diffusion(face_kappa=kdiff, face_variable=self.face_elevation, region=self)
self.update_elevation(delta_face_elevation)
self.add_data("ejecta_thickness", long_name="ejecta thickness", units="m", data=delta_face_elevation, positive_only=True)
return
[docs]
def slope_collapse(self, critical_slope_angle: FloatLike = 35.0) -> NDArray:
"""
Collapse all slopes larger than the critical slope angle.
Parameters
----------
critical_slope_angle : float
The critical slope angle (angle of repose) in degrees.
"""
try:
critical_slope = np.tan(np.deg2rad(critical_slope_angle))
except ValueError as e:
raise ValueError("critical_slope_angle must be between 0 and 90 degrees") from e
delta_face_elevation = surface_bindings.slope_collapse(critical_slope=critical_slope, region=self)
self.update_elevation(delta_face_elevation)
self.add_data("ejecta_thickness", long_name="ejecta thickness", units="m", data=delta_face_elevation, positive_only=True)
[docs]
def compute_slope(self) -> NDArray[np.float64]:
"""
Compute the slope of the surface.
Returns
-------
NDArray[np.float64]
The slope of all faces in degrees.
"""
slope = surface_bindings.compute_slope(
face_elevation=self.face_elevation,
region=self,
)
return np.rad2deg(np.arctan(slope))
[docs]
def apply_noise(
self,
model: str = "turbulence",
noise_width: FloatLike = 1000e3,
noise_height: FloatLike = 1e3,
**kwargs: Any,
) -> None:
"""
Apply noise to the node elevations of the surface view.
Parameters
----------
noise_width : float
The spatial wavelength of the noise.
noise_height : float
The amplitude of the noise.
"""
num_octaves = kwargs.pop("num_octaves", 12)
anchor = kwargs.pop(
"anchor",
self.surface.rng.uniform(0, 2 * np.pi, size=(num_octaves, 3)),
)
x = np.concatenate([self.face_x, self.node_x]) / self.surface.radius
y = np.concatenate([self.face_y, self.node_y]) / self.surface.radius
z = np.concatenate([self.face_z, self.node_z]) / self.surface.radius
if model == "turbulence":
noise = surface_bindings.turbulence_noise(
x=x,
y=y,
z=z,
noise_height=noise_height / self.surface.radius,
noise_width=noise_width / self.surface.radius,
freq=2.0,
pers=0.5,
anchor=anchor,
seed=self.surface.rng.integers(0, 2**32 - 1),
)
else:
raise ValueError(f"Unknown noise model: {model}")
# Compute the weighted mean to ensure volume conservation
mean = np.sum(noise[: self.n_face] * self.face_area) / self.area
noise -= mean
self.update_elevation(noise)
return
[docs]
def calculate_face_and_node_distances(
self, location: tuple[float, float] | None = None, validate: bool = True
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Computes the distances between nodes and faces and a given location.
Parameters
----------
location : tuple[float, float], option
tuple containing the longitude and latitude of the location in degrees. If None, the location of the view center is used if it is set.
validate : bool, optional
If the location should be validated and normalized. Only use if the validation is causing a performance bottleneck.
Returns
-------
NDArray
Array of face distances in meters.
NDArray
Array of node distances in meters.
"""
if location is None:
if self.is_local:
location = self.location
validate = False
else:
raise ValueError("A value for location must be provided for global surfaces.")
if len(location) == 1:
location = location.item()
if len(location) != 2:
raise ValueError("location must be a single pair of (longitude, latitude).")
if validate:
location = validate_and_normalize_location(location)
node_locations = np.vstack((self.node_lon, self.node_lat)).T
face_locations = np.vstack((self.face_lon, self.face_lat)).T
return self.compute_distances(
locations=face_locations, reference_location=location, validate=False
), self.compute_distances(locations=node_locations, reference_location=location, validate=False)
[docs]
def calculate_face_and_node_bearings(self, location: tuple[float, float] | None = None) -> tuple[NDArray, NDArray]:
"""
Computes the initial bearing between nodes and faces and a given location.
Parameters
----------
location : tuple[float, float]
tuple containing the longitude and latitude of the location in degrees.
Returns
-------
NDArray
Array of initial bearings for each face in degrees.
NDArray
Array of initial bearings for each node in degrees.
"""
if location is None:
if self.is_local:
location = self.location
else:
raise ValueError("A value for location must be provided for global surfaces.")
if len(location) == 1:
location = location.item()
if len(location) != 2:
raise ValueError("location must be a single pair of (longitude, latitude).")
location = validate_and_normalize_location(location)
face_bearing, node_bearing = (
surface_bindings.compute_bearings(
lon1=np.radians(location[0]),
lat1=np.radians(location[1]),
lon2=np.radians(self.face_lon),
lat2=np.radians(self.face_lat),
),
surface_bindings.compute_bearings(
lon1=np.radians(location[0]),
lat1=np.radians(location[1]),
lon2=np.radians(self.node_lon),
lat2=np.radians(self.node_lat),
),
)
return np.degrees(face_bearing), np.degrees(node_bearing)
[docs]
def interpolate_node_elevation_from_faces(self) -> None:
"""
Update node elevations by area-weighted averaging of adjacent face elevations.
For each node, the elevation is computed as the area-weighted average of the elevations
of the surrounding faces.
Returns
-------
None
"""
node_elevation = surface_bindings.interpolate_node_elevation_from_faces(region=self)
self.add_data(name="node_elevation", data=node_elevation, overwrite=True)
return
[docs]
def get_reference_surface(
self, reference_radius: float | None = None, only_faces: bool = False, **kwargs: Any
) -> NDArray[np.float64]:
"""
Calculate the orientation of a hemispherical cap that represents the average surface within a given region.
Parameters
----------
reference_radius : float
The radius of the reference region to compute the average over in meters. If None, the region_radius of the LocalSurface is used.
only_faces : bool
If True, only return the face elevation data of the reference surface. Default is False.
**kwargs : Any
|kwargs|
Returns
-------
NDArray[np.float64]
The face and node elevation points of the reference sphere, or the original elevation points is the reference region is too small
"""
def _find_reference_elevations(x: NDArray, y: NDArray, z: NDArray) -> NDArray:
"""
Compute the mean plane that fits the points given by the projected x, projected y, and elevation array.
Parameters
----------
region_points : NDArray
An array of shape (n, 3) where each row contains [x, y, elevation].
Returns
-------
NDArray
An array of elevation values corresponding to the mean plane for each projected x and y point.
"""
# Create the design matrix for the plane fitting
A = np.vstack([x, y, np.ones(len(x))]).T
# Solve for the coefficients of the plane (Ax + By + C = z)
coeffs, _, _, _ = np.linalg.lstsq(A, z, rcond=None)
return coeffs[0] * x + coeffs[1] * y + coeffs[2]
if reference_radius is None:
reference_radius = self.region_radius
# Find cells within the crater radius
faces_within_region = self.face_distance <= reference_radius
if only_faces:
points_within_region = faces_within_region
elevation = self.face_elevation
n_reference = np.sum(faces_within_region)
else:
elevation = np.concatenate([self.face_elevation, self.node_elevation])
nodes_within_region = self.node_distance <= reference_radius
if not nodes_within_region.any():
return elevation
points_within_region = np.concatenate([faces_within_region, nodes_within_region])
n_reference = np.sum(faces_within_region) + np.sum(nodes_within_region)
# if there are not points within the radius, return the original elevations and use that as the reference
if n_reference < 5:
return elevation
self.set_face_proj()
if only_faces:
x, y, z = self.face_proj_x, self.face_proj_y, self.face_elevation
else:
self.set_node_proj()
x = np.concatenate([self.face_proj_x, self.node_proj_x])
y = np.concatenate([self.face_proj_y, self.node_proj_y])
z = np.concatenate([self.face_elevation, self.node_elevation])
reference_elevation = elevation
reference_elevation[points_within_region] = _find_reference_elevations(
x[points_within_region], y[points_within_region], z[points_within_region]
)
return reference_elevation
[docs]
def compute_volume(self, elevation: NDArray) -> NDArray:
"""
Compute the volume of an array of elevation points.
Parameters
----------
elevation : NDArray
The elevation values to compute. This should be the same size as the number of faces in the grid.
Returns
-------
float
The volume of the elevation points
"""
if elevation.size != self.n_face:
raise ValueError("elevation must be an array with the same size as the number of faces in the grid")
return np.sum(elevation * self.face_area)
[docs]
def elevation_to_cartesian(self, element="face") -> NDArray[np.float64]:
"""
Convert either the face or node elevations to Cartesian coordinates.
Parameters
----------
element : str, optional
The type of element to convert. Can be "face" or "node". Default is "face".
Returns
-------
NDArray[np.float64, 3]
The Cartesian coordinates of the face elevations.
"""
if element not in ("face", "node"):
raise ValueError("element must be either 'face' or 'node'")
if element == "face":
position = np.column_stack((self.face_x, self.face_y, self.face_z))
elevation = self.face_elevation
elif element == "node":
position = np.column_stack((self.node_x, self.node_y, self.node_z))
elevation = self.node_elevation
return self.surface._compute_elevation_to_cartesian(position, elevation)
[docs]
def compute_radial_gradient(self, variable: str | NDArray[np.float64]) -> NDArray[np.float64]:
"""
Compute the radial gradient of the local surface variable with respect to the local_location center.
Parameters
----------
variable : str or NDArray[np.float64]
The variable to compute the radial gradient of. This can be a string (the name of a variable in the surface data) or an array of values the same size as the number of faces in the grid.
Returns
-------
NDArray[np.float64]
The radial gradient of all faces in meters per meter.
"""
if isinstance(variable, str):
if variable not in self.uxds:
raise ValueError(f"Variable {variable} not found in the surface data.")
variable = self.uxds[variable].data
elif variable.size != self.n_face:
raise ValueError("variable must be a string or an array with the same size as the number of faces in the grid")
radial_gradient = surface_bindings.compute_radial_gradient(
variable=variable,
region=self,
)
return radial_gradient
[docs]
def export(
self,
driver: str = "GPKG",
interval: int | None = None,
ask_overwrite: bool | None = None,
**kwargs: Any,
) -> None:
"""
Export the surface view data to a specified file format and stores it in the default export directory.
If the format is "VTK," the data will be exported using the :py:meth:`Surface.to_vtk` method. Otherwise, it will use the :py:meth:`Surface.to_vector_file` to export the data.
Parameters
----------
driver : str, optional
The driver to use export the data to. Supported formats are 'VTK' or a driver supported by GeoPandas ('GPKG', 'ESRI Shapefile', etc.), and 'GeoTIFF'.
interval : int | None, optional
|interval_export|
ask_overwrite : bool | None, optional
|ask_overwrite_methods|
**kwargs : Any
|kwargs|
"""
from cratermaker.constants import VECTOR_DRIVER_TO_EXTENSION_MAP
# Temporarily set the ask_overwrite attribute for the duration of the export, but reset it to its original value afterwards.
ask_overwrite_orig = self.ask_overwrite
if ask_overwrite is not None:
self.ask_overwrite = ask_overwrite
if interval is not None:
self.surface.save(interval=interval, **kwargs, skip_actions=True)
if driver.upper() in ["VTK", "VTP"]:
self.to_vtk_file(
interval=interval,
**kwargs,
)
elif driver.upper() in ["GEOTIFF", "GTIFF", "TIFF", "TIF"]:
self.to_geotiff_file(
interval=interval,
**kwargs,
)
elif driver.upper() in VECTOR_DRIVER_TO_EXTENSION_MAP:
self.to_vector_file(
driver=driver,
interval=interval,
**kwargs,
)
self.ask_overwrite = ask_overwrite_orig
return
[docs]
def to_vector_file(
self,
driver: str = "GPKG",
interval: int | None = None,
to_file_kwargs: dict | None = None,
**kwargs: Any,
) -> None:
"""
Export the face-associated data from the surface view data to a vector file using GeoPandas.
See `geopandas.GeoDataFrame.to_file <https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.to_file.html>`_ for more detailed information on the available parameters.
Parameters
----------
driver : str, optional
The file format driver to use for exporting. Default is 'GPKG'.
interval : int | None, optional
|interval_export|
to_file_kwargs : dict | None, optional
Additional keyword arguments to pass to the GeoDataFrame.to_file method.
**kwargs : Any
|kwargs|
"""
from cratermaker.constants import VECTOR_DRIVER_TO_EXTENSION_MAP
def _write_dataset(uxds, filename, layer_name, driver, **kwargs):
if self.is_global:
# Exclude periodic elements for global surfaces. For some reason, UxArray gets the windings wrong when using the split argument, so we have to exclude instead.
gdfargs = {"engine": "geopandas", "periodic_elements": "exclude"}
else:
gdfargs = {"engine": "geopandas", "periodic_elements": "ignore"}
# Convert to GeoDataFrame and set the CRS correctly for the type of surface
gdf = uxds.uxgrid.to_geodataframe(**gdfargs).set_crs(self.surface.crs).to_crs(self.crs)
variables = [v for v in uxds.data_vars if any(dim == "n_face" for dim in uxds[v].dims)]
if not variables:
raise ValueError("No face-based variables found to export to file.")
# Shapefile can't store the crater id layers, so we remove variables with the pattern "idXX"
if driver == "ESRI Shapefile":
variables = [var for var in variables if not var.startswith("id")]
for var in variables:
if len(uxds[var].dims) == 1:
gds = uxds[var].to_geodataframe(**gdfargs)
gdf[var] = gds[var]
elif "layer" in uxds[var].dims:
for layer in range(uxds.layer.size):
gds = uxds[var].isel(layer=layer).to_geodataframe(**gdfargs)
gdf[f"{var}{layer:02d}"] = gds[var]
# Rename columns if exporting to Shapefile to comply with field name length limits
if driver == "ESRI Shapefile":
gdf = gdf.rename(columns={col: shp_key_fix(col) for col in gdf.columns})
print(f"Exporting to {filename} using driver {driver}")
if not self._overwrite_check(filename):
return
gdf.to_file(filename, layer=layer_name, driver=driver, **kwargs)
return
def shp_key_fix(key: str) -> str:
"""
ESRI Shapefile format limits field names to 10 characters, so this function substitues longer names with shorter alternatives, truncates the results, and sets them to upper case.
"""
alt_names = {
"face_elevation": "faceelev",
"face_distance": "facedis",
"face_bearing": "facebearng",
"face_indices": "faceindx",
}
for long, short in alt_names.items():
if long in key:
key = key.replace(long, short)
return key[:10].upper()
# Common alias for Shapefile
if driver.upper() == "SHP":
driver = "ESRI Shapefile"
if driver.upper() in VECTOR_DRIVER_TO_EXTENSION_MAP:
file_extension = VECTOR_DRIVER_TO_EXTENSION_MAP[driver.upper()]
else:
raise ValueError("Cannot infer file extension from driver {driver}.")
uxds = self.read_saved_output(interval=interval, reset=False)
interval_numbers = uxds.interval.values
for interval in interval_numbers:
uxdsi = uxds.sel(interval=interval).load()
filename = self.export_dir / self.output_filename(interval).replace(self.output_file_extension, file_extension)
_write_dataset(
uxdsi,
filename=filename,
layer_name="face_data",
driver=driver,
**to_file_kwargs if to_file_kwargs is not None else {},
)
return
[docs]
def to_vtk_mesh(self, uxds: UxDataset | None = None, interval: int | None = None, **kwargs: Any) -> vtkUnstructuredGrid:
"""
Exports the regional mesh to a VTK PolyData object.
Parameters
----------
uxds : UxDataset, optional
The dataset to export. If None, the method will use currently loaded data in the surface. Default is None.
interval : int, optional
The interval number to export. If provided, the method will either extract the interval number from uxds (if it has intervals saved), or, if no uxds is passed, load a saved interval from file.
**kwargs : Any
|kwargs|
Returns
-------
vtkUnstructuredGrid
The VTK PolyData object representing the regional mesh.
"""
from vtk import (
VTK_POLYGON,
vtkPoints,
)
from vtkmodules.util.numpy_support import numpy_to_vtk
from vtkmodules.vtkFiltersCore import vtkPolyDataNormals
from vtkmodules.vtkFiltersGeometry import vtkGeometryFilter
if uxds is None:
if interval is None:
uxds = self.uxds
else:
uxds = self.read_saved_output(interval=interval, reset=False).isel(interval=0)
interval = None
if interval is not None:
if "interval" not in uxds:
raise ValueError("uxds does not have an 'interval' variable")
uxds = uxds.isel(interval=interval)
node_xyz = np.c_[self.node_x, self.node_y, self.node_z]
node_normals = node_xyz / self.surface.radius
vtk_point_normals = numpy_to_vtk(node_normals.astype(np.float32), deep=True)
vtk_point_normals.SetNumberOfComponents(3)
vtk_point_normals.SetName("Normals")
# Warp the mesh according to node elevation if it exists
if "node_elevation" in uxds:
warped_xyz = node_xyz + uxds.node_elevation.data[:, None] * node_normals
else:
warped_xyz = node_xyz
node_x = warped_xyz[:, 0]
node_y = warped_xyz[:, 1]
node_z = warped_xyz[:, 2]
vtk_data = vtkUnstructuredGrid()
nodes = vtkPoints()
for i in range(self.n_node):
nodes.InsertNextPoint(node_x[i], node_y[i], node_z[i])
vtk_data.SetPoints(nodes)
vtk_data.Allocate(self.n_face)
for i, n in enumerate(self.n_nodes_per_face):
point_ids = self.face_node_connectivity[i][0:n]
vtk_data.InsertNextCell(VTK_POLYGON, n, point_ids)
grid = vtkUnstructuredGrid()
grid.DeepCopy(vtk_data)
for v in uxds.variables:
array = numpy_to_vtk(uxds[v].values, deep=True)
array.SetName(v)
if "n_face" in uxds[v].dims:
grid.GetCellData().AddArray(array)
elif "n_node" in uxds[v].dims:
grid.GetPointData().AddArray(array)
if v == "node_elevation":
grid.GetPointData().SetActiveScalars(v)
elif uxds[v].dims == ("time",) or uxds[v].size == 1:
grid.GetFieldData().AddArray(array)
geom_filter = vtkGeometryFilter()
geom_filter.SetInputData(grid)
geom_filter.Update()
poly_data = geom_filter.GetOutput()
poly_data.GetPointData().SetNormals(vtk_point_normals)
normals_filter = vtkPolyDataNormals()
normals_filter.SetInputData(poly_data)
normals_filter.ComputeCellNormalsOn()
normals_filter.Update()
mesh = normals_filter.GetOutput()
return mesh
[docs]
def to_vtk_file(
self,
interval: int | None = None,
**kwargs: Any,
) -> None:
"""
Export the regional mesh to a VTK file and store it in the default export directory.
Parameters
----------
interval : int, optional
|interval_export|
**kwargs : Any
|kwargs|
"""
def _write_current_mesh(mesh, output_filename):
from vtk import vtkXMLPolyDataWriter
writer = vtkXMLPolyDataWriter()
writer.SetDataModeToBinary()
writer.SetCompressorTypeToZLib()
if not self._overwrite_check(output_filename):
return
if output_filename.exists():
output_filename.unlink()
writer.SetFileName(output_filename)
writer.SetInputData(mesh)
print(f"Exporting to {output_filename}")
writer.Write()
return
uxds = self.read_saved_output(interval=interval, reset=False)
interval_numbers = uxds.interval.values
if interval is not None:
if interval < 0:
interval = interval_numbers[interval]
interval_numbers = [interval]
# Check if we need to save the geometry file
grid_filename = self.export_dir / f"{self._grid_file_prefix}.vtp"
if not grid_filename.exists():
grid = self.to_vtk_mesh(uxds=self.uxgrid.to_xarray())
_write_current_mesh(grid, grid_filename)
for interval in interval_numbers:
uxdsi = uxds.sel(interval=interval).load()
mesh = self.to_vtk_mesh(uxds=uxdsi)
filename = self.export_dir / self.output_filename(interval=interval).replace(self.output_file_extension, "vtp")
_write_current_mesh(mesh, filename)
return
[docs]
def to_raster(
self, uxda: UxDataArray | None = None, **kwargs: Any
) -> tuple[NDArray[np.float32], tuple[float, float, float, float], Any, CRS]:
"""
Rasterize a face-based variable into a 2D raster using rasterio.
Parameters
----------
uxda : UxDataArray | None
The UxDataArray containing the face-based variable to rasterize. If None, the method will use currently loaded data in the surface with "face_elevation" as the default variable. Default is None.
**kwargs : Any
|kwargs|
Returns
-------
raster : NDArray[np.float32]
The rasterized variable as a 2D numpy array.
extent : tuple[float, float, float, float]
The extent of the raster in the format (xmin, xmax, ymin, ymax).
transform : Affine
The affine transform for the raster.
crs : CRS
The coordinate reference system of the raster.
"""
# Check if rasterio is installed, and if not, just return without plotting
try:
from rasterio.features import rasterize
from rasterio.transform import Affine, from_bounds
except ImportError:
warnings.warn(
"rasterio is not installed. Cannot generate plot. On some platforms, you may need to install GDAL first before installing rasterio.",
stacklevel=2,
)
return
if uxda is None:
uxda = self.uxds["face_elevation"].load()
W, H = self.get_raster_dims()
if self.is_global:
# Splitting doesn't work well and makes a hash of the raster. So we'll just drop the periodic elements instead
gdf = uxda.to_geodataframe(engine="geopandas", periodic_elements="exclude").set_crs(self.crs)
xmin, xmax = -180.0, 180.0
ymin, ymax = -90.0, 90.0
# Get the approximate pixel size based on the number of faces
npix = self.surface.n_face
pix_area = 4.0 * np.pi * (self.surface.radius**2) / npix
pix = np.sqrt(pix_area)
deg_per_pix = 180.0 * pix / (np.pi * self.surface.radius)
xres = yres = deg_per_pix
transform = from_bounds(xmin, ymin, xmax, ymax, W, H)
else:
gdf = uxda.to_geodataframe(engine="geopandas", periodic_elements="ignore").set_crs(self.surface.crs).to_crs(self.crs)
pix = self.pix
R = (
self.region_radius + 4 * pix
) # Add a 4 * pix border to enswure that the pixels along the border of the local region are plotted in full
xmin, xmax = -R, R
ymin, ymax = -R, R
# pixel size (meters/pixel)
xres = yres = (2.0 * R) / W
# upper-left at (-R, +R); y increases downward in rasters
transform = Affine.translation(-R, R) * Affine.scale(xres, -yres)
vals = gdf[uxda.name].to_numpy()
geoms = gdf.geometry.values
shapes = [
(geom, float(val))
for geom, val in zip(geoms, vals, strict=False)
if (geom is not None) and (not geom.is_empty) and np.isfinite(val)
]
extent = (xmin, xmax, ymin, ymax)
raster = rasterize(
shapes=shapes,
out_shape=(H, W),
transform=transform,
fill=np.nan,
dtype=np.float32,
all_touched=True,
)
return raster, extent, transform, gdf.crs
[docs]
def get_raster_dims(self) -> tuple[int, int]:
"""
Get the dimensions of the raster based on the region radius (or target, if this is a global surface) and pixel size.
Returns
-------
tuple[int, int]
The width and height of the raster in pixels.
"""
if self.is_global:
# Splitting doesn't work well and makes a hash of the raster. So we'll just drop the periodic elements instead
xmin, xmax = -180.0, 180.0
ymin, ymax = -90.0, 90.0
# Get the approximate pixel size based on the number of faces
npix = self.surface.n_face
pix_area = 4.0 * np.pi * (self.surface.radius**2) / npix
pix = np.sqrt(pix_area)
deg_per_pix = 180.0 * pix / (np.pi * self.surface.radius)
xres = yres = deg_per_pix
W = int(np.ceil((xmax - xmin) / xres))
H = int(np.ceil((ymax - ymin) / yres))
else:
R = self.region_radius
xmin, xmax = -R, R
ymin, ymax = -R, R
pix = self.pix
W = H = int(max(1, np.ceil((2.0 * R) / pix)))
return W, H
[docs]
def to_geotiff_file(
self,
interval: int | None = None,
variable_name: str = "face_elevation",
**kwargs,
) -> None:
"""
Rasterize a face-based elevation variable into a GeoTIFF using rasterio.
Parameters
----------
interval : int, optional
|interval_export|
variable_name : str, optional
The name of the variable to rasterize. Default is "face_elevation".
"""
import matplotlib.pyplot as plt
import rasterio as rio
from cartopy import crs as ccrs
def _write_dataset(uxda, output_filename, **kwargs):
if self.is_global:
projection = ccrs.PlateCarree()
else:
projection = ccrs.AzimuthalEquidistant(central_longitude=self.location[0], central_latitude=self.location[1])
raster, extent, transform, crs = self.to_raster(uxda, **kwargs)
H, W = raster.shape
fig = plt.figure(figsize=(W, H), dpi=1)
ax = fig.add_axes([0, 0, 1, 1], projection=projection)
ax.set_axis_off()
fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
ax.set_extent(extent, crs=projection)
profile = {
"driver": "GTiff",
"height": H,
"width": W,
"count": 1,
"dtype": raster.dtype,
"crs": crs,
"transform": transform,
"nodata": np.nan,
}
print(f"Exporting to {output_filename}")
with rio.open(output_filename, "w", **profile) as dst:
dst.write(raster, 1)
return
# load data and select the face-based variables
uxds = self.read_saved_output(interval=interval, reset=False)
interval_numbers = uxds.interval.values
file_extension = "tif"
for interval in interval_numbers:
uxda = uxds.sel(interval=interval)[variable_name].load()
filename = self.export_dir / f"{self._output_file_prefix}{interval:06d}.{file_extension}"
_write_dataset(
uxda,
filename,
**kwargs,
)
return
[docs]
def save(
self,
interval: int = 0,
time_variables: dict | None = None,
filename: Path | str | None = None,
**kwargs,
) -> None:
"""
Save the region surface data to the specified directory.
Each data variable is saved to a separate NetCDF file. If 'time_variables' is specified, then a one or more variables will be added to the dataset along the time dimension.
Parameters
----------
interval : int, optional
Interval number to append to the data file name. Default is 0.
time_variables : dict, optional
Dictionary containing one or more variable name and value pairs. These will be added to the dataset along the time dimension. Default is None.
filename : Path | str, optional
The path to the output file. If None, the file will be saved to the default
**kwargs : Any
|kwargs|
"""
self.surface.output_dir.mkdir(parents=True, exist_ok=True)
if interval is None:
interval = 0
if time_variables is not None and not isinstance(time_variables, dict):
raise TypeError("time_variables must be a dictionary")
# Sort any layers that we might have before saving
for var in self.uxds.data_vars:
if "layer" in self.uxds[var].dims:
self.sort_tags(name=var)
self.uxds.close()
ds = self.uxds.expand_dims(dim="interval").assign_coords({"interval": [interval]})
if time_variables is not None:
for k, v in time_variables.items():
ds[k] = xr.DataArray(data=[v], name=k, dims=["interval"], coords={"interval": [interval]})
self.surface._save_data(
ds,
interval=interval,
filename=filename,
)
if self.is_local: # Save the local grid if this is a local surface
self._write_grid_file()
save_args = {
"interval": interval,
"time_variables": time_variables,
"filename": filename,
**kwargs,
}
super().save(**save_args)
return
def _write_grid_file(self, uxgrid: uxr.Grid | None = None, grid_file: Path | str | None = None, **kwargs: Any) -> None:
"""
Write the grid to a NetCDF file.
Parameters
----------
uxgrid : uxr.Grid, optional
The grid to be written to the file. If None, the grid will be obtained from the surface object. Default is None.
grid_file : Path, str, optional
The path to the grid file. If None, the path will be obtained from the surface object. Default is None.
**kwargs : Any
|kwargs|
"""
if uxgrid is None:
uxgrid = self.uxgrid
if grid_file is None:
grid_file = self.surface.output_dir / f"{self._grid_file_prefix}.{self._output_file_extension}"
else:
grid_file = Path(grid_file)
self.surface._write_grid_file(uxgrid=uxgrid, grid_file=grid_file, **kwargs)
return
[docs]
def plot(
self,
filename: Path | str | None = None,
plot_style: Literal["map", "hillshade"] = "map",
variable_name: str | None = None,
interval: int | None = None,
cmap: str | None = None,
label: str | None = None,
scalebar: bool | None = None,
colorbar: bool = True,
show: bool = True,
save: bool = False,
ax: Axes | None = None,
close_when_done: bool | None = None,
minimum_plot_width: float | None = 800,
**kwargs: Any,
) -> Axes:
"""
Plot an image of the local region.
Parameters
----------
filename : Path | str | None, optional
The path to save the plot to. If None, and save is True, the plot will be saved to the default plot directory with a filename based on the interval number. Default is None.
plot_style : str, optional
The style of the plot. Options are "map" and "hillshade". In "map" mode, the variable is displayed as a colored map. In "hillshade" mode, a hillshade image is generated using "face_elevation" data. If a different variable is passed to `variable`, then the hillshade will be overlayed with that variable's data. Default is "map".
variable_name : str | None, optional
The variable to plot. If None is provided then "face_elevation" is used in "map" mode.
interval: int | None, optional
The interval number of the surface to plot. If None, the currently loaded surface data will be used.
cmap : str, optional
The colormap to use for the plot. If None, a default colormap will be used ("cividis" by default and "grey" when plot_style=="hillshade" and variable=="face_elevation").
label : str | None, optional
A label for the plot. If None, no label will be added.
scalebar : bool, optional
If True, a scalebar will be added to the plot. Default is True.
colorbar : bool, optional
If True, a colorbar will be added to the plot when using "map" plot_style or "hillshade" with a variable overlay. Default is True.
show : bool, optional
If True, the plot will be displayed. Default for local surfaces is True.
save : bool, optional
If True, the plot will be saved to the default plot directory. Default is False unless filename is provided, in which case the default is True.
ax : matplotlib.axes.Axes, optional
An existing Axes object to plot on. If None, a new figure and axes will be created.
close_when_done : bool, optional
If True, the figure will be closed after plotting. Default is True when save is True and show is False, and False otherwise.
minimum_plot_width : float, optional
Because the width of the plot is determined by the number of faces, small regions will generate small plots with labels that are hard to read. This parameter sets a lower limit to the width of the image that is generated by the plot. By default it is 800. Set to None to turn it off.
**kwargs : Any
|kwargs|
Returns
-------
matplotlib.axes.Axes
A Matplotlib Axes object containing the plot.
"""
import matplotlib.pyplot as plt
from matplotlib import rc_context, rcParams
from matplotlib.colors import LightSource, Normalize
from scipy.ndimage import gaussian_filter
if scalebar is None:
scalebar = True
interval_numbers = self.surface.get_saved_interval_numbers()[0]
if interval is None or interval == interval_numbers[-1]:
uxds = self.uxds
else:
uxds = self.read_saved_output(interval=interval, reset=False)
interval = uxds.interval.values.item()
uxds = uxds.sel(interval=interval)
if variable_name is not None and variable_name not in uxds:
raise ValueError(f"Variable '{variable_name}' not found in the surface data.")
if variable_name is None and plot_style == "map":
variable_name = "face_elevation"
if plot_style == "hillshade":
if variable_name is None:
do_overlay = False
else:
do_overlay = True
if close_when_done is None:
close_when_done = save and not show
colorbar = colorbar and (plot_style == "map" or do_overlay)
if variable_name is not None:
ret = self.to_raster(uxds[variable_name].load())
variable_long_name = uxds[variable_name].attrs.get("long_name", variable_name)
variable_units = uxds[variable_name].attrs.get("units", "")
variable_raster = ret[0]
extent = ret[1]
H, W = variable_raster.shape
vmin = kwargs.pop("vmin", np.nanmin(variable_raster))
vmax = kwargs.pop("vmax", np.nanmax(variable_raster))
norm = Normalize(vmin=vmin, vmax=vmax)
else:
variable_long_name = ""
variable_units = ""
vmin = kwargs.pop("vmin", 0.0)
vmax = kwargs.pop("vmax", 1.0)
if plot_style == "hillshade":
hill_args = {"dx": self.pix, "dy": self.pix, "fraction": 1.0}
ret = self.to_raster(uxds["face_elevation"].load())
elevation = ret[0]
extent = ret[1]
elevation = gaussian_filter(elevation, sigma=2, mode="constant", cval=np.nan)
H, W = elevation.shape
azimuth = 300.0
solar_angle = 20.0
ls = LightSource(azdeg=azimuth, altdeg=solar_angle)
if do_overlay:
if cmap is None:
cmap = "cividis"
cmap = plt.get_cmap(cmap)
variable_raster = np.clip((variable_raster - vmin) / (vmax - vmin), 0.0, 1.0)
rgb = cmap(variable_raster)
blended = ls.shade_rgb(rgb, elevation, blend_mode="overlay", **hill_args)
if np.any(np.isnan(variable_raster[~np.isnan(elevation)])):
hillshade = ls.hillshade(elevation, **hill_args)
graymap = plt.get_cmap("gray")
hillshade = graymap(hillshade)
cvals = np.where(blended == 0, hillshade, blended)
else:
cvals = blended
else:
if cmap is None:
cmap = "gray"
cvals = ls.hillshade(elevation, **hill_args)
interpolation = kwargs.pop("interpolation", "lanczos")
elif plot_style == "map":
if cmap is None:
cmap = "cividis"
cvals = variable_raster
interpolation = kwargs.pop("interpolation", "bicubic")
else:
raise ValueError("plot_style must be either 'map' or 'hillshade'")
if minimum_plot_width is not None:
dpi = max(minimum_plot_width, W)
# Plot with (1, 1) inch figure and dpi=resolution for exact pixel size
if ax is None:
_, ax = plt.subplots(figsize=(1, 1), dpi=dpi, frameon=False, layout="constrained")
# This helps scale plot elements that are sized in points, because we're using a somewhat non-standard dpi and figure size to plot the rasterio-genrated surface data
scale_fraction = 72 / dpi
scaled_params = {
k: v * scale_fraction
for k, v in rcParams.items()
if v is not None and type(v) is float and ("pad" in k or "width" in k or "size" in k)
}
scaled_params["font.size"] = 24 * scale_fraction
with rc_context(scaled_params):
ax.imshow(cvals, interpolation=interpolation, cmap=cmap, vmin=vmin, vmax=vmax, aspect="equal", extent=extent)
ax.axis("off")
xmin, xmax, ymin, ymax = extent
# Add scale bar before saving/showing image
if scalebar and self.is_local:
# Determine max physical size for the scale bar
max_physical_size = xmax / 2 / np.sqrt(2)
# Choose "nice" scale bar length
nice_values = np.array([1, 10, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000]) # in meters
scale_length = nice_values[nice_values <= max_physical_size].max()
bar_height = 0.2 * scale_length
scale_text = f"{int(scale_length)} m" if scale_length < 1000 else f"{int(scale_length / 1000)} km"
# Position in lower right corner
x_start = xmax - scale_length + xmin * 0.05
y_start = -(ymax - bar_height + ymin * 0.05)
rect = plt.Rectangle((x_start, y_start), scale_length, bar_height, color="black")
ax.add_patch(rect)
# Label above the scale bar
ax.text(
x_start + scale_length / 2,
y_start + 2 * bar_height,
scale_text,
color="black",
ha="center",
va="bottom",
fontweight="bold",
)
if label:
# Label in the upper left corner
x_start = xmin
y_start = ymax
if self.is_local:
va = "top" # Local surfaces are a circle with empty corners, so we can put the label in the upper left corner without it overlapping the surface image.
else:
va = "bottom" # Global surfaces fill most of the entire image, so we need to place the label above the plotting area to avoid overlap.
ax.text(
x_start,
y_start,
label,
color="black",
ha="left",
va=va,
fontweight="bold",
)
if colorbar:
norm = Normalize(vmin=vmin, vmax=vmax)
sm = plt.cm.ScalarMappable(norm=norm, cmap=cmap)
cbar = plt.colorbar(sm, ax=ax, orientation="horizontal", shrink=0.5)
cbar.set_label(label=f"{variable_long_name} [{variable_units}]")
if save:
file_prefix = f"{self.output_file_prefix}_{plot_style}"
if variable_name is not None:
file_prefix += f"_{variable_name}"
if interval is None:
uxds = self.uxds
filename = self.plot_dir / f"{file_prefix}.{self.output_image_file_extension}"
else:
uxds = self.read_saved_output(interval=interval, reset=False)
interval = uxds.interval.values.item()
uxds = uxds.sel(interval=interval)
filename = self.plot_dir / f"{file_prefix}{interval:06d}.{self.output_image_file_extension}"
plt.savefig(filename, pad_inches=0, dpi=ax.figure.get_dpi())
if show:
plt.show()
if close_when_done and (save or show):
plt.close()
return ax
[docs]
def pyvista_plotter(
self,
variable_name: str | None = None,
variable: ArrayLike | None = None,
interval: int | None = None,
theme: str | None = None,
transparent_background: bool | None = None,
plotter: pv.Plotter | None = None,
enable_interactive: bool = True,
**kwargs: Any,
) -> pv.Plotter:
"""
Show the local surface region using an interactive 3D plot with PyVista.
Parameters
----------
variable_name : str | None, optional
The name of the variable to plot. If the name of the variable is not already stored on the surface mesh, then the `variable` argument must also be passed. Default is None, which will plot a greyscale image of the surface.
variable : (n_face) array, optional
An array face values that will be used to color the surface mesh. This is required if `variable_name` is not a face variable that is already saved in the the uxds dataset. Default is None.
interval : int | None, optional
The interval number of the surface to plot. If None, the currently loaded surface data will be used. Default is None.
theme : str, optional
The PyVista plot theme to use. If None, the default PyVista theme will be used. Default is None.
transparent_background : bool, optional
If True, the background of the plot will be transparent. Default is None, which will use the default background setting for the chosen plot theme.
plotter : pyvista.Plotter, optional
A pre-existing Plotter object to use. If None, then a new one will be created and returned. Default is None.
enable_interactive : bool, optional
If True, the default PyVista key events will be updated to include custom events for toggling scalar visibility, changing the camera view, and showing a help message. Default is True.
**kwargs : Any
|kwargs|
Returns
-------
pyvista.Plotter
The PyVista Plotter object for further customization.
"""
from cratermaker.constants import PYVISTA_ADD_MESH_KWARGS
from cratermaker.utils.general_utils import toggle_pyvista_actor, update_pyvista_help_message
def _set_title(uxds, variable_name):
if variable_name in uxds and "long_name" in uxds[variable_name].attrs:
if "units" in uxds[variable_name].attrs:
title = f"{uxds[variable_name].attrs['long_name']} ({uxds[variable_name].attrs['units']})"
else:
title = uxds[variable_name].attrs["long_name"]
else:
title = variable_name
return title
def update_scalars(plotter, cmap):
camera_orig = plotter.camera
mesh_actor = plotter.actors[self.target.name]
if not plotter.scalar_bars:
scalar_bar_actor = None
else:
old_title = next(iter(plotter.scalar_bars.keys()))
scalar_bar_actor = plotter.scalar_bars[old_title]
scalar_names = [""] # Add entry for no scalars
for v in mesh_actor.mapper.dataset.array_names:
if v in mesh_actor.mapper.dataset.cell_data:
scalar_names.append(v)
scalar_names = list(set(scalar_names)) # Remove duplicates
scalar_names.sort()
if mesh_actor.mapper.GetScalarVisibility():
idx = scalar_names.index(mesh_actor.mapper.array_name)
idx += 1
if idx == len(scalar_names):
idx = 0
else: # We are currently not showing scalars, so start over from the first non-empty slot
idx = 1
next_scalar_name = scalar_names[idx]
if next_scalar_name == "":
mesh_actor.mapper.SetScalarVisibility(False)
if scalar_bar_actor is not None:
scalar_bar_actor.SetVisibility(False)
else:
mesh_actor.mapper.SetScalarVisibility(True)
mesh_actor.mapper.array_name = next_scalar_name
mesh_actor.mapper.dataset.active_scalars_name = next_scalar_name
mesh_actor.mapper.scalar_range = mesh_actor.mapper.dataset.get_data_range(arr_var=next_scalar_name)
mesh_actor.mapper.SetColorModeToMapScalars()
mesh_actor.mapper.lookup_table.cmap = cmap
mesh_actor.mapper.SetScalarVisibility(True)
title = _set_title(self.uxds, next_scalar_name)
if scalar_bar_actor is None:
scalar_bar_actor = plotter.add_scalar_bar(title=title, mapper=mesh_actor.mapper)
else:
scalar_bar_actor.SetVisibility(True)
scalar_bar_actor.SetTitle(title)
plotter.camera = camera_orig
plotter.update()
return
def _help_message(help_message: str | None = None):
help_message += "\nv: Isometric view"
help_message += "\nUp/Down: Zoom in/out"
help_message += "\n+/-: Increase/decrease point size"
help_message += "\nw: Wireframe view"
help_message += "\ns: Shaded view"
help_message += "\nC: Enable cell picking"
help_message += "\nh: Toggle this help message"
help_message += "\nq: Quit"
help_actor = pv.CornerAnnotation(0, help_message, name="help")
help_actor.SetVisibility(False)
return help_actor
def reset_view(plotter):
if self.is_global:
plotter.reset_camera()
plotter.view_yz()
else:
# Compute camera position so that it sits over the local region and the region fills the frame
center_face_ind = self.surface.find_nearest_face(self.location)
local_center = np.array(
[
self.surface.face_x[center_face_ind],
self.surface.face_y[center_face_ind],
self.surface.face_z[center_face_ind],
]
)
# Add a 20% buffer to keep the region fully in view
d = 1.2 * self.radius / np.tan(np.radians(plotter.camera.view_angle / 2))
distance_multiplier = 1.0 + d / self.target.radius
plotter.camera_position = local_center * distance_multiplier
plotter.camera.focal_point = local_center
plotter.camera.clipping_range = (0.35 * plotter.camera.distance, 2.0 * plotter.camera.distance)
return
if theme is not None:
pv.set_plot_theme(theme)
if transparent_background is not None:
pv.global_theme.transparent_background = transparent_background
new_plotter = plotter is None
if new_plotter:
plotter = pv.Plotter()
plotter.enable_hidden_line_removal()
if interval is None:
uxds = self.uxds
else:
uxds = self.read_saved_output(interval=interval).isel(interval=0)
interval = None
mesh = self.to_vtk_mesh(uxds)
if new_plotter:
reset_view(plotter)
face_variables = []
component_variables = []
for v in uxds.data_vars:
if uxds[v].ndim > 0 and uxds[v].shape[0] == self.n_face:
mesh.cell_data[v] = uxds[v].data
face_variables.append(v)
if uxds[v].ndim == 2:
component_variables.append(v)
component = kwargs.pop("component", None)
if variable_name is not None and variable_name in face_variables:
title = _set_title(uxds, variable_name)
if variable_name in component_variables:
component = 0 if component is None else component
title += f" (layer {component})"
else:
if variable is not None:
variable = np.asarray(variable, dtype=np.float64)
if variable.shape[0] != self.n_face:
raise ValueError("variable must be a string or an array with the same size as the number of faces in the grid")
mesh.cell_data[variable_name] = variable
title = variable_name
if variable.ndim == 2:
component = 0 if component is None else component
title += f" (layer {component})"
elif variable_name is not None:
raise ValueError(
f"Variable '{variable_name}' not found in the surface data. The 'variable' argument must be provided with scalar values for the faces."
)
if variable_name is None:
scalars = "face_elevation" # Add a default scalar to enable the cmap to be applied properly if we decide to turn them back on later
else:
scalars = variable_name
cmap = kwargs.pop("cmap", "cividis")
add_mesh_kwargs = {k: v for k, v in kwargs.items() if k in PYVISTA_ADD_MESH_KWARGS}
add_mesh_kwargs = {
"name": self.target.name,
"show_edges": False,
"show_scalar_bar": False,
"color": "grey",
"pbr": True,
**add_mesh_kwargs,
}
mesh_actor = plotter.add_mesh(mesh, scalars=scalars, component=component, cmap=cmap, **add_mesh_kwargs)
if new_plotter and self.is_global:
plotter.view_yz()
if variable_name is None:
mesh_actor.mapper.SetScalarVisibility(False)
else:
plotter.add_scalar_bar(title=title, mapper=mesh_actor.mapper)
if enable_interactive and new_plotter:
plotter = update_pyvista_help_message(plotter, new_message="j: Cycle through scalar face variables")
plotter.add_key_event("j", lambda plotter=plotter, cmap=cmap: update_scalars(plotter, cmap=cmap))
plotter.add_key_event("r", lambda plotter=plotter: reset_view(plotter))
return plotter
[docs]
def export_region_polygon(self, driver: str = "GPKG", **kwargs: Any) -> None:
"""
Export the local surface region as a polygon to a vector file.
This will create a polygon that can be used for the `OpenCraterTool <https://github.com/thomasheyer/OpenCraterTool>`__ plugin in QGIS.
Parameters
----------
driver : str, optional
The OGR driver to use for exporting the polygon. Default is "GPKG"
**kwargs : Any
|kwargs|
"""
import geopandas as gpd
import pandas as pd
from cratermaker.components.crater import Crater
from cratermaker.constants import VECTOR_DRIVER_TO_EXTENSION_MAP
if driver.upper() in VECTOR_DRIVER_TO_EXTENSION_MAP:
file_extension = VECTOR_DRIVER_TO_EXTENSION_MAP[driver.upper()]
else:
raise ValueError("Cannot infer file extension from driver {driver}.")
# If this is a local surface, we will use the Crater functionality to generate a polygon representation of it
if self.is_local:
region = Crater.maker(radius=self.region_radius, location=self.location)
geoms = region.to_geoseries(surface=self.surface, split_antimeridian=False, use_measured_properties=False).to_crs(
self.crs
)
df = pd.DataFrame(
{"area": [self.area], "area_name": ["local_region"]},
index=[0, 1],
)
gdf = gpd.GeoDataFrame(data=df, geometry=geoms, crs=self.crs)
filename = self.export_dir / f"local_surface_AREA.{file_extension}"
if file_extension == "shp":
gdf.to_file(filename, driver=driver, **kwargs)
else:
gdf.to_file(filename, layer="local_surface_AREA", driver=driver, **kwargs)
return
[docs]
def compute_distances(
self,
locations: ArrayLike,
reference_location: PairOfFloats | None = None,
validate: bool = True,
) -> NDArray[np.float64]:
"""
Calculate the great circle distance between one point and one or more other points in meters.
Parameters
----------
locations : ArrayLike
Longitude and latitude of the second point or array of points in degrees.
reference_location : PairOfFloats | None, optional
Longitude and latitude of the first point in degrees. If None, then the center of the local region will be used. Default is None.
validate : bool, optional
If the locations should be validated and normalized. Only use if the validation is causing a performance bottleneck.
Returns
-------
NDArray
Great circle distance between the two points in meters.
Notes
-----
This is a wrapper for a compiled Rust function and is intended to be used as a helper to calculate_face_and_node_distances.
"""
if reference_location is None:
if self.is_local:
reference_location = self.location
else:
raise ValueError("reference_location must be provided for global surfaces")
if validate:
locations = validate_and_normalize_location(locations)
reference_location = validate_and_normalize_location(reference_location)
locations = np.radians(locations)
lon1, lat1 = np.radians(reference_location)
if locations.ndim == 1 and locations.size == 2:
locations = np.expand_dims(locations, axis=0)
return surface_bindings.compute_distances(
lon1=lon1, lat1=lat1, lon2=locations[:, 0], lat2=locations[:, 1], radius=self.surface.radius
)
[docs]
def compute_bearings(self, locations: ArrayLike, reference_location: PairOfFloats | None = None) -> NDArray[np.float64]:
"""
Calculate the initial bearing from one point to one or more other points in degrees.
Parameters
----------
locations : ArrayLike
Longitude and latitude of the second point or array of points in degrees.
reference_location : PairOfFloats | None, optional
Longitude and latitude of the first point in degrees. If None, then the center of the local region will be used. Default is None.
Returns
-------
NDArray
Initial bearing from the first point to the second point or points in degrees.
Notes
-----
This is intended to be used as a helper to calculate_face_and_node_bearings.
"""
if reference_location is None:
if self.is_local:
reference_location = self.location
else:
raise ValueError("reference_location must be provided for global surfaces")
locations = np.asarray(locations, dtype=np.float64)
if locations.ndim == 1 and locations.size == 2:
locations = locations.reshape((1, 2))
lon1, lat1 = np.radians(validate_and_normalize_location(reference_location))
lon2, lat2 = np.radians(locations[:, 0]), np.radians(locations[:, 1])
bearing = surface_bindings.compute_bearings(lon1=lon1, lat1=lat1, lon2=lon2, lat2=lat2)
return np.degrees(bearing)
[docs]
def compute_location_from_distance_bearing(
self,
distance: FloatLike | ArrayLike,
bearing: FloatLike | ArrayLike,
reference_location: PairOfFloats | None = None,
) -> NDArray[np.float64]:
"""
Calculate the longitude and latitude of one or more points given a reference point, initial bearings, and distances.
Parameters
----------
bearing : FloatLike | ArrayLike
One or more initial bearing values from the reference point to the target point or points in degrees.
distance : FloatLike | ArrayLike
One or more great circle distance from the reference point to the target point or points in meters.
reference_location : PairOfFloats | None, optional
One pair of longitude and latitude of the reference point in degrees. If None, then the center of the local region will be used. Default is None.
Returns
-------
tuple or list of tuples the same length as bearing and distance.
longitude and latitude as a tuple of floats in degrees.
"""
if reference_location is None:
if self.is_local:
reference_location = self.location
else:
raise ValueError("reference_location must be provided for global surfaces")
lon1, lat1 = np.radians(validate_and_normalize_location(reference_location))
bearing = np.atleast_1d(np.radians(bearing))
distance = np.atleast_1d(distance).astype(np.float64)
if bearing.shape != distance.shape:
raise ValueError("bearing and distance must have the same shape")
if bearing.ndim > 1:
raise ValueError("bearing and distance must have the same number of elements")
lonlat2 = surface_bindings.compute_location_from_distance_bearing(
lon1=lon1, lat1=lat1, distances=distance, bearings=bearing, radius=self.surface.radius
)
return validate_and_normalize_location(np.degrees(lonlat2))
@staticmethod
def _remap_connectivity_to_local(
connectivity_array: np.ndarray,
row_indices: np.ndarray,
value_indices: np.ndarray,
total_value_size: int,
fill_value: int = INT_FILL_VALUE,
) -> np.ndarray:
"""
Remap a 2D connectivity array where the rows are indexed by one entity (e.g., edges) and the values are global references to another entity (e.g., faces).
Only rows in `row_indices` are retained, and their values are remapped from global to local indices using `value_indices`.
Parameters
----------
connectivity_array : np.ndarray
The global 2D connectivity array of shape (N, K).
row_indices : np.ndarray
Global indices indicating which rows to retain.
value_indices : np.ndarray
Global indices of values to remap (e.g., face indices).
total_value_size : int
Total number of global items in the value domain (e.g., total number of faces).
fill_value : int, optional
Fill value indicating invalid entries (default is INT_FILL_VALUE).
Returns
-------
np.ndarray
A new 2D connectivity array of shape (len(row_indices), K) with local indices or fill_value where not applicable.
"""
if isinstance(row_indices, slice) and row_indices == slice(None): # This is a global slice. Just return the original array
return connectivity_array
if isinstance(value_indices, slice):
value_indices = np.arange(total_value_size)[value_indices]
# Build global-to-local index mapping
mapping = np.full(total_value_size, fill_value, dtype=np.int64)
mapping[value_indices] = np.arange(value_indices.size)
subset = connectivity_array[row_indices]
remapped = np.full_like(subset, fill_value)
valid = subset != fill_value
remapped[valid] = mapping[subset[valid]]
return remapped
[docs]
def saved_output_files(self, **kwargs: Any) -> list[Path]:
"""
Check if the component has any output files in its output directory.
Returns
-------
list[Path]
A list of Path objects representing the files that would be removed during a reset operation. Returns an empty list if no files found
"""
output_files = super().saved_output_files(**kwargs)
# remove grid file from the list, as it is not removed during reset
if self.grid_file in output_files:
output_files.remove(self.grid_file)
return output_files
[docs]
def get_location_extents(self) -> tuple[float, float, float, float]:
"""
Computes the longitude and latitude extents of the local region.
Returns
-------
lon_min : float
Minimum longitude in degrees.
lon_max : float
Maximum longitude in degrees.
lat_min : float
Minimum latitude in degrees.
lat_max : float
Maximum latitude in degrees.
"""
if self.is_global:
raise ValueError("Cannot get the location extents of a global region.")
return self.surface.get_location_extents(self.location, self.radius)
@property
def surface(self) -> Surface:
"""The Surface object that contains the mesh data for the global surface."""
return self._surface
@property
def target(self):
"""The Target object associated with the LocalSurface."""
return self.surface.target
@property
def n_edge(self) -> int:
"""The number of edges in the view."""
if self._n_edge is None:
if isinstance(self._edge_indices, slice):
if np.isscalar(self._surface._uxds.uxgrid.n_edge):
self._n_edge = int(self._surface._uxds.uxgrid.n_edge)
else:
self._n_edge = int(self._surface._uxds.uxgrid.n_edge[self._edge_indices].size)
else:
self._n_edge = self._edge_indices.size
return self._n_edge
@property
def n_face(self) -> int:
"""The number of faces in the view."""
if self._n_face is None:
if isinstance(self.face_indices, slice):
if np.isscalar(self.surface.face_elevation.size):
self._n_face = int(self.surface.face_elevation.size)
else:
self._n_face = int(self.surface.face_elevation[self.face_indices].size)
elif isinstance(self.face_indices, np.ndarray):
self._n_face = int(self.face_indices.size)
return self._n_face
@property
def n_node(self) -> int:
"""The number of nodes in the view."""
if self._n_node is None:
if isinstance(self.node_indices, slice):
if np.isscalar(self.surface.node_elevation.size):
self._n_node = int(self.surface.node_elevation.size)
else:
self._n_node = int(self.surface.node_elevation[self.node_indices].size)
elif isinstance(self.node_indices, np.ndarray):
self._n_node = int(self.node_indices.size)
return self._n_node
@property
def n_nodes_per_face(self) -> NDArray:
"""The number of nodes per face in the view."""
return self.surface.n_nodes_per_face[self.face_indices]
@property
def face_size(self) -> NDArray:
"""The effective pixel size of faces in the view in meters."""
return self.surface.face_size[self.face_indices]
@property
def location(self) -> tuple[float, float]:
"""The (longitude, latitude) location of the center of the view in degrees."""
return self._location
@property
def region_radius(self) -> FloatLike:
"""The radius of the region to include in the view in meters."""
return self._region_radius
@property
def face_bearing(self) -> NDArray:
"""The initial bearing from the location to the faces measured clockwise relative to North in degrees."""
if self._location is not None and self._face_bearing is None:
self._face_bearing, self._node_bearing = self.calculate_face_and_node_bearings()
return self._face_bearing
@property
def node_bearing(self) -> NDArray:
"""The initial bearing from the location to the nodes measured clockwise relative to North in degrees."""
if self._location is not None and self._node_bearing is None:
self._face_bearing, self._node_bearing = self.calculate_face_and_node_bearings()
return self._node_bearing
@property
def face_distance(self) -> NDArray:
"""The distance from the location to the faces in meters."""
if self._location is not None and self._face_distance is None:
self._face_distance, self._node_distance = self.calculate_face_and_node_distances()
return self._face_distance
@property
def node_distance(self) -> NDArray:
"""The distance from the location to the nodes in meters."""
if self._location is not None and self._node_distance is None:
self._face_distance, self._node_distance = self.calculate_face_and_node_distances()
return self._node_distance
@property
def face_area(self) -> NDArray:
"""The areas of the faces in the view in m²."""
return self.surface.face_area[self.face_indices]
@property
def face_lat(self) -> NDArray:
"""Latitude of the center of the faces in the view in degrees."""
return self.surface.face_lat[self.face_indices]
@property
def face_lon(self) -> NDArray:
"""Longitude of the center of the faces in the viewin degrees."""
return self.surface.face_lon[self.face_indices]
@property
def face_x(self) -> NDArray:
"""Cartesian x location of the center of the faces in the view in meters."""
return self.surface.face_x[self.face_indices]
@property
def face_y(self) -> NDArray:
"""Cartesian y location of the center of the faces in the view in meters."""
return self.surface.face_y[self.face_indices]
@property
def face_z(self) -> NDArray:
"""Cartesian z location of the center of the faces in the view in meters."""
return self.surface.face_z[self.face_indices]
@property
def edge_face_distance(self) -> NDArray:
"""
Distances between the edges and the faces in the view in meters.
Dimensions: `(n_edge)`
"""
return self.surface.edge_face_distance[self.edge_indices]
@property
def edge_length(self) -> NDArray:
"""
Lengths of the edges in the view in meters.
Dimensions: `(n_edge)`
"""
return self.surface.edge_length[self.edge_indices]
@property
def node_lat(self) -> NDArray:
"""Latitude of the nodes in the view in degrees."""
return self.surface.node_lat[self.node_indices]
@property
def node_lon(self) -> NDArray:
"""Longitude of the nodes in the view in degrees."""
return self.surface.node_lon[self.node_indices]
@property
def node_x(self) -> NDArray:
"""Cartesian x location of the nodes in the view in meters."""
return self.surface.node_x[self.node_indices]
@property
def node_y(self) -> NDArray:
"""Cartesian y location of the nodes in the view in meters."""
return self.surface.node_y[self.node_indices]
@property
def node_z(self) -> NDArray:
"""Cartesian z location of the nodes in the view in meters."""
return self.surface.node_z[self.node_indices]
@property
def area(self) -> float:
"""The total area of the faces in the view in m²."""
if self._area is None:
self._area = float(self.face_area.sum())
return self._area
@property
def face_indices(self) -> NDArray:
"""The indices of the faces in the view."""
if self._face_indices is None:
raise ValueError("face_indices must be set to use this object.")
return self._face_indices
@property
def node_indices(self) -> NDArray:
"""The indices of the nodes in the view."""
if self._node_indices is None:
self._node_indices = np.unique(self.surface.face_node_connectivity[self.face_indices].ravel())
self._node_indices = self._node_indices[self._node_indices != INT_FILL_VALUE]
return self._node_indices
@property
def edge_indices(self) -> NDArray:
"""The indices of the edges in the view."""
if self._edge_indices is None:
self._edge_indices = np.unique(self.surface.face_edge_connectivity[self.face_indices].ravel())
self._edge_indices = self._edge_indices[self._edge_indices != INT_FILL_VALUE]
return self._edge_indices
@property
def edge_face_connectivity(self) -> NDArray:
"""
Local indices of the faces that make up the edges.
Dimensions: `(n_edge, 2)`
"""
if self._edge_face_connectivity is None:
self._edge_face_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.edge_face_connectivity,
row_indices=self.edge_indices,
value_indices=self.face_indices,
total_value_size=self.surface.n_face,
)
return self._edge_face_connectivity
@property
def face_edge_connectivity(self) -> NDArray:
"""
Local indices of the edges that make up the faces.
Dimensions: `(n_face, n_max_face_edges)`
"""
if self._face_edge_connectivity is None:
self._face_edge_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.face_edge_connectivity,
row_indices=self.face_indices,
value_indices=self.edge_indices,
total_value_size=self.surface.n_edge,
)
return self._face_edge_connectivity
@property
def face_node_connectivity(self) -> NDArray:
"""
Local indices of the nodes that make up the faces.
Dimensions: `(n_face, n_max_face_nodes)`
Nodes are in counter-clockwise order.
"""
if self._face_node_connectivity is None:
self._face_node_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.face_node_connectivity,
row_indices=self.face_indices,
value_indices=self.node_indices,
total_value_size=self.surface.n_node,
)
return self._face_node_connectivity
@property
def face_face_connectivity(self) -> NDArray:
"""
Indices of the faces that surround the faces.
Dimensions: `(n_face, n_max_face_faces)`
"""
if self._face_face_connectivity is None:
self._face_face_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.face_face_connectivity,
row_indices=self.face_indices,
value_indices=self.face_indices,
total_value_size=self.surface.n_face,
)
return self._face_face_connectivity
@property
def node_face_connectivity(self) -> NDArray:
"""
Indices of the faces that surround the nodes.
Dimensions: `(n_node, n_max_node_faces)`
"""
if self._node_face_connectivity is None:
self._node_face_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.node_face_connectivity,
row_indices=self.node_indices,
value_indices=self.face_indices,
total_value_size=self.surface.n_face,
)
return self._node_face_connectivity
@property
def edge_node_connectivity(self) -> NDArray:
"""
Local indices of the nodes that make up the edges.
Dimensions: `(n_edge, 2)`
"""
if self._edge_node_connectivity is None:
self._edge_node_connectivity = self._remap_connectivity_to_local(
connectivity_array=self.surface.edge_node_connectivity,
row_indices=self.edge_indices,
value_indices=self.node_indices,
total_value_size=self.surface.n_node,
)
return self._edge_node_connectivity
@property
def crs(self) -> CRS:
"""
Return a local Azimuthal Equidistant (LAEA) CRS centered on `self.location` (in meters).
If `self.location` is not set, fall back to the parent `Surface` geographic CRS.
The LAEA CRS uses the target body's spherical radius.
"""
if self._crs is None:
self._crs = self.surface.get_crs(
radius=self.surface.radius,
name=self.surface.target.name,
location=self.location,
)
return self._crs
[docs]
def read_saved_output(self, interval: int | None = None, reset: bool = False, **kwargs: Any) -> uxr.UxDataset:
"""
Read the saved local surface data from disk for a given interval number and return it as a UxDataset.
Parameters
----------
interval : int | None, optional
Interval number of data file to read. Default is None (all intervals are read).
reset : bool, optional
Flag to indicate whether to reset the surface. If True it reads in the grid but creates an empty dataset and saves it to interval 0. Default is False.
**kwargs : Any
|kwargs|
Returns
-------
UxDataset
An initialized UxDataset object containing the grid and data.
Dataset
The xarray Dataset containing only the local surface data.
"""
if self.is_local:
uxds_global = self.surface.read_saved_output(interval=interval, **kwargs)
if not reset:
return uxr.UxDataset(uxds_global.sel(n_face=self.face_indices, n_node=self.node_indices), uxgrid=self.uxgrid)
else:
return uxr.UxDataset(uxgrid=self.uxgrid)
if reset:
interval = 0
filename = self.output_dir / self.output_filename(interval)
filename.unlink(missing_ok=True)
files = super().read_saved_output(interval=interval, **kwargs)
if isinstance(files, tuple):
ds, grid = files
else:
grid = files
ds = None
if grid is None:
raise ValueError("No grid file found.")
uxgrid = uxr.Grid.from_dataset(grid)
if ds is None:
reset = True
if reset:
uxds = uxr.UxDataset(uxgrid=uxgrid)
else:
uxds = uxr.UxDataset.from_xarray(ds, uxgrid=uxgrid)
for v in uxds.data_vars:
uxds[v].attrs = ds[v].attrs.copy()
# Ensure that the tags are the correct data type
if "layer" in ds[v].dims:
uxds[v] = uxds[v].astype(np.uint32)
return uxds
@property
def uxgrid(self) -> uxr.Grid:
"""The Uxarrray Grid representation of the local surface."""
if self._uxgrid is None:
if self.is_global:
self._uxgrid = self.surface.uxgrid
else:
grid_ds = xr.Dataset(
data_vars={
"face_x": (("n_face",), self.face_x),
"face_y": (("n_face",), self.face_y),
"face_z": (("n_face",), self.face_z),
"face_lat": (("n_face",), self.face_lat),
"face_lon": (("n_face",), self.face_lon),
"node_x": (("n_node",), self.node_x),
"node_y": (("n_node",), self.node_y),
"node_z": (("n_node",), self.node_z),
"node_lat": (("n_node",), self.node_lat),
"node_lon": (("n_node",), self.node_lon),
"edge_face_distance": (("n_edge",), self.edge_face_distance),
"edge_length": (("n_edge",), self.edge_length),
"edge_face_connectivity": (
("n_edge", "two"),
self.edge_face_connectivity,
),
"face_edge_connectivity": (
("n_face", "n_max_face_edges"),
self.face_edge_connectivity,
),
"face_node_connectivity": (
("n_face", "n_max_face_nodes"),
self.face_node_connectivity,
),
"face_face_connectivity": (
("n_face", "n_max_face_faces"),
self.face_face_connectivity,
),
"node_face_connectivity": (
("n_node", "n_max_node_faces"),
self.node_face_connectivity,
),
"edge_node_connectivity": (
("n_edge", "two"),
self.edge_node_connectivity,
),
},
)
grid_ds["grid_topology"] = self.surface.uxgrid._ds.grid_topology
self._uxgrid = uxr.Grid.from_dataset(grid_ds)
return self._uxgrid
@property
def uxds(self) -> UxDataset:
"""The UxDataset representation of the local surface."""
if self.is_global:
return self.surface.uxds
return uxr.UxDataset(self.surface.uxds.sel(n_face=self.face_indices, n_node=self.node_indices), uxgrid=self.uxgrid)
@property
def grid_file(self):
"""Path to the grid file."""
return self.output_dir / f"{self._grid_file_prefix}.{self._output_file_extension}"
@property
def pix(self) -> float:
"""The effective pixel size of the base surface in meters."""
return self.surface.pix
@property
def radius(self) -> float:
"""The radius of the local region in meters."""
return self.region_radius
@property
def plot_dir(self) -> Path:
"""The directory to save plots to."""
return self.surface.plot_dir
@property
def from_surface(self) -> Transformer:
"""A pyproj Transformer object to convert from the surface CRS to the local CRS."""
if self._from_surface is None:
self._from_surface = Transformer.from_crs(
self.surface.crs,
self.crs,
always_xy=True,
)
return self._from_surface
@property
def to_surface(self) -> Transformer:
"""A pyproj Transformer object to convert from the local CRS to the surface CRS."""
if self._to_surface is None:
self._to_surface = Transformer.from_crs(
self.crs,
self.surface.crs,
always_xy=True,
)
return self._to_surface
[docs]
def set_face_proj(self):
"""
Set the projected x and y coordinates of the faces relative to the LocalSurface center.
"""
if self.face_distance is not None and self.face_bearing is not None and self._face_proj_x is None:
self._face_proj_x = self.face_distance * np.sin(np.radians(self.face_bearing))
self._face_proj_y = self.face_distance * np.cos(np.radians(self.face_bearing))
return
[docs]
def set_node_proj(self):
"""
Set the projected x and y coordinates of the nodes relative to the LocalSurface center.
"""
if self.node_distance is not None and self.node_bearing is not None and self._node_proj_x is None:
self._node_proj_x = self.node_distance * np.sin(np.radians(self.node_bearing))
self._node_proj_y = self.node_distance * np.cos(np.radians(self.node_bearing))
return
@property
def face_proj_x(self) -> NDArray:
"""The projected x coordinates of the faces relative to the LocalSurface center in meters."""
return self._face_proj_x
@property
def face_proj_y(self) -> NDArray:
"""The projected y coordinates of the faces relative to the LocalSurface center in meters."""
return self._face_proj_y
@property
def node_proj_x(self) -> NDArray:
"""The projected x coordinates of the nodes relative to the LocalSurface center in meters."""
return self._node_proj_x
@property
def node_proj_y(self) -> NDArray:
"""The projected y coordinates of the nodes relative to the LocalSurface center in meters."""
return self._node_proj_y
@property
def desloped_face_elevation(self) -> NDArray:
"""The face elevations with the mean slope of the region removed in meters."""
return self._desloped_face_elevation
[docs]
def compute_desloped_face_elevation(self):
"""Compute the face elevations with the mean slope of the region removed."""
if self.is_local and self._desloped_face_elevation is None:
reference_elevation = self.get_reference_surface(only_faces=True)
self._desloped_face_elevation = self.uxds.face_elevation.data - reference_elevation
return
@property
def is_local(self) -> bool:
"""Whether this surface is a local region or the full global surface."""
return self.location is not None
@property
def is_global(self) -> bool:
"""Whether this surface is the full global surface."""
return self.location is None
[docs]
def data_composer(self) -> DataComposer:
"""
Creates a ``DataComposer`` tied to this ``LocalSurface``. This should usually be called in a ``with`` statement.
Returns
-------
DataComposer
"""
return DataComposer(self)
@property
def face_variables(self) -> list[str]:
"""
Returns a list of the data variables currently being stored on the faces.
"""
return [v for v in self.uxds.variables if "n_face" in self.uxds[v].dims]
@property
def node_variables(self) -> list[str]:
"""
Returns a list of the data variables currently being stored on the nodes.
"""
return [v for v in self.uxds.variables if "n_node" in self.uxds[v].dims]
[docs]
class DataComposer(AbstractContextManager):
"""
Created using ``Surface.data_composer`` or ``LocalSurface.data_composer``. Contains some static methods for loading LOLA data.
"""
def __init__(self, surface: LocalSurface):
"""
**Warning:** This object should not be instantiated directly. Instead, use ``Surface.data_composer()`` or ``LocalSurface.data_composer()``.
"""
self._localsurface = surface
self._data_list: list[DatasetReader] = []
self._finished = False
[docs]
def add_data(
self,
data: DatasetReader | str | list[DatasetReader | str],
):
"""
Adds data to eventually be applied to the surface.
The data isn't applied until ``finish`` is called or, if appplicable, ``self`` exits the ``with`` context.
Parameters
----------
dataset : DatasetReader | str | list[DatasetReader | str]
A single dataset or a list of multiple datasets to be merged.
Calls ``rasterio.open`` when necessary.
"""
if self._finished:
raise ValueError(f"{type(self).__name__} is already finished or cancelled.")
if not isinstance(data, list):
data = [data]
for i, src in enumerate(data):
if not isinstance(src, DatasetReader):
data[i] = rasterio.open(src)
self._data_list.append(data[i])
[docs]
def cancel(self):
"""
Cancels the execution and closes all open datasets.
"""
for dataset in self._data_list:
dataset.close()
self._data_list = []
self._finished = True
[docs]
def finish(self):
"""
Apply all the datasets to the mesh and close them. This is implicitly called when exiting a with context.
"""
if self._finished:
raise ValueError(f"{type(self).__name__} is already finished or cancelled.")
def _orderable_distance(lon1, lat1, lon2, lat2):
_nnon1 = np.deg2rad(lon1)
lat1 = np.deg2rad(lat1)
lon2 = np.deg2rad(lon2)
lat2 = np.deg2rad(lat2)
dlon = (lon2 - lon1 + np.pi) % (2 * np.pi)
dlat = lat2 - lat1
return np.pow(np.sin(dlat / 2), 2) + np.cos(lat1) * np.cos(lat2) * np.pow(np.sin(dlon / 2), 2)
def _read(dataset: DatasetReader, window: Window | None = None) -> tuple[NDArray, Window]:
block_windows = []
for _, block in dataset.block_windows(1):
if window is None or windows.intersect(window, block):
block_windows.append(block)
res_window: Window = windows.union(block_windows)
print(f" Reading {dataset.name} ({res_window.width}x{res_window.height}px of {dataset.width}x{dataset.height}px)")
res = np.empty((res_window.height, res_window.width))
for block in tqdm(block_windows):
out = res[
block.row_off - res_window.row_off : block.row_off - res_window.row_off + block.height,
block.col_off - res_window.col_off : block.col_off - res_window.col_off + block.width,
]
dataset.read(1, window=block, out=out)
return res, res_window
print(f"Applying {len(self._data_list)} dataset{'' if len(self._data_list) == 1 else 's'} to the mesh")
lons = np.concatenate((self._localsurface.face_lon, self._localsurface.node_lon))
lats = np.concatenate((self._localsurface.face_lat, self._localsurface.node_lat))
read_data: list[NDArray] = []
data_windows: list[Window] = []
pix_dist = np.full((len(self._data_list), len(lons)), np.inf)
for n, dataset in enumerate(self._data_list):
to_data = Transformer.from_crs(self._localsurface.surface.crs, dataset.crs)
from_data = Transformer.from_crs(dataset.crs, self._localsurface.surface.crs)
x_coords, y_coords = to_data.transform(lons, lats)
mask1 = np.isfinite(x_coords) & np.isfinite(y_coords)
rows, cols = rowcol(dataset.transform, x_coords[mask1], y_coords[mask1])
mask2 = (cols >= 0) & (cols < dataset.width) & (rows >= 0) & (rows < dataset.height)
mask1[mask1] = mask2
rows = rows[mask2]
cols = cols[mask2]
if self._localsurface.is_local:
cmin, rmin, cmax, rmax = np.min(cols), np.min(rows), np.max(cols), np.max(rows)
data, window = _read(dataset, window=Window(cmin, rmin, cmax - cmin + 1, rmax - rmin + 1))
else:
data, window = _read(dataset)
read_data.append(data)
data_windows.append(window)
values = data[rows - window.row_off, cols - window.col_off]
print(" Calculating pixel distances")
mask3 = np.isfinite(values) & (values != dataset.nodata)
mask1[mask1] = mask3
x_pix, y_pix = from_data.transform(*dataset.xy(rows[mask3], cols[mask3]))
pix_dist[n, mask1] = _orderable_distance(lons[mask1], lats[mask1], x_pix, y_pix)
idx = np.argmin(pix_dist, axis=0)
global_mask = np.isfinite(pix_dist[idx, np.arange(len(idx))])
elevation = np.zeros_like(idx, dtype=np.float32)
print(" Setting elevation data")
for n, (dataset, data, window) in enumerate(zip(self._data_list, read_data, data_windows, strict=True)):
to_data = Transformer.from_crs(self._localsurface.surface.crs, dataset.crs)
mask = global_mask & (idx == n)
if "KILOMETER" in dataset.units:
scale_factor = 1000.0
else:
scale_factor = 1.0
x_coords, y_coords = to_data.transform(lons[mask], lats[mask])
rows, cols = rowcol(dataset.transform, x_coords, y_coords)
elevation[mask] = data[rows - window.row_off, cols - window.col_off] * scale_factor
dataset.close()
self._localsurface.update_elevation(elevation)
self._data_list = []
self._finished = True
print(" Done")
def __enter__(self) -> DataComposer:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.finished:
return
if exc_type is None:
self.finish()
else:
self.cancel()
@staticmethod
def _get_lola_cylindrical_url_from_pds(
pds_file_resolution: int, location: PairOfFloats, boundary_offset: tuple[int, int] = (0, 0)
) -> str:
"""
Retrieve the appropriate LOLA DEM file url for a given location and resolution from the PDS.
Parameters
----------
pds_file_resolution : int
DEM resolution in meters per pixel.
location : PairOfFloats,
The longitude and latitude of the location in degrees.
boundary_offset : tuple[int, int], optional
Offset to apply to tile index to access neighboring tiles. Default is (0, 0). This is used when the local region crosses one or more tile boundaries.
Returns
-------
url : str
Link to the DEM file
Notes
-----
This is meant to be used for mid-latitude regions on the Moon between -60 and 60 degrees latitude.
For resolutions of 128, 256, and 512 pix/deg, this will return the SLDEM2015 datasets.
"""
lola_cylindrical_url = (
"https://pds-geosciences.wustl.edu/lro/lro-l-lola-3-rdr-v1/lrolol_1xxx/data/lola_gdr/cylindrical/float_img/"
)
sldem_global_url = "https://pds-geosciences.wustl.edu/lro/lro-l-lola-3-rdr-v1/lrolol_1xxx/data/sldem2015/global/float_img/"
sldem_tile_url = "https://pds-geosciences.wustl.edu/lro/lro-l-lola-3-rdr-v1/lrolol_1xxx/data/sldem2015/tiles/float_img/"
def _get_pds_file_suffix(pds_file_resolution, location, boundary_offset):
if location[0] < 0:
location = (location[0] + 360, location[1])
if pds_file_resolution == 512:
dlon = 45
dlat = 30
elif pds_file_resolution == 256:
dlon = 120
dlat = 60
latdir = "n" if location[1] + boundary_offset[1] * dlat > 0 else "s"
latval = np.abs(location[1] / dlat) + boundary_offset[1]
if latdir == "n":
latlo = int(np.ceil(latval - 1) * dlat)
lathi = int(latlo + dlat)
else:
lathi = int(np.floor(latval + 1) * dlat)
latlo = int(lathi - dlat)
lonval = np.abs(location[0] / dlon) + boundary_offset[0]
# Pole crossing, so flip the longitudes by 180 degrees
if lathi > 90 or latlo > 90:
lathi = 90
latlo = lathi - dlat
lonval += 180 / dlon
lonlo = int(np.floor(lonval) * dlon) % 360
lonhi = int(lonlo + dlon)
# Check to make sure the lo and hi values are in the right direction
if (latdir.upper() == "N" and latlo > lathi) or (latdir.upper() == "S" and latlo < lathi):
tmp = latlo
latlo = lathi
lathi = tmp
if pds_file_resolution == 512:
return f"{latlo:02d}{latdir.lower()}_{lathi:02d}{latdir.lower()}_{lonlo:03d}_{lonhi:03d}"
elif pds_file_resolution == 256:
return f"{latlo:d}{latdir.lower()}_{lathi:d}{latdir.lower()}_{lonlo:03d}_{lonhi:03d}"
if pds_file_resolution >= 256:
url = f"{sldem_tile_url}sldem2015_{pds_file_resolution:d}_{_get_pds_file_suffix(pds_file_resolution, location, boundary_offset)}_float.xml"
elif pds_file_resolution == 128:
url = f"{sldem_global_url}sldem2015_{pds_file_resolution:d}_60s_60n_000_360_float.xml"
else:
url = f"{lola_cylindrical_url}ldem_{pds_file_resolution:d}_float.xml"
return url
[docs]
@staticmethod
def get_lola_cylindrical_files_from_pds(
resolution: FloatLike, lat_range: PairOfFloats, lon_range: PairOfFloats
) -> tuple[list[str], int]:
"""
Retrieve the appropriate cylindrically projected LOLA DEM file url or list of urls for a given location and resolution from the PDS.
This will determine which, if any, boundaries are crossed and will return a list of files to cover a region.
Parameters
----------
resolution : FloatLike
Requested resolution in degrees per pixel. The closest available resolution will be used.
lat_range : tuple of float, optional
The (min_lat, max_lat) in degrees of the local region.
lon_range : tuple of float, optional
The (min_lon, max_lon) in degrees of the local region.
"""
AVAILABLE_RESOLUTIONS = [4, 16, 64, 128, 256, 512] # pix / deg
diffs = [abs(resolution - res) for res in AVAILABLE_RESOLUTIONS]
pds_file_resolution = AVAILABLE_RESOLUTIONS[np.argmin(diffs)]
lat_min, lat_max = lat_range
lon_min, lon_max = lon_range
center = ((lon_min + lon_max) / 2.0, (lat_min + lat_max) / 2.0)
# First, retrive the file for the centerpoint:
filelist = [DataComposer._get_lola_cylindrical_url_from_pds(pds_file_resolution, center)]
if pds_file_resolution < 256:
return (
filelist,
pds_file_resolution,
) # These files cover the entire globe, no need to determine if boundaries are crossed
combo = [(lon_min, lat_min), (lon_min, lat_max), (lon_max, lat_min), (lon_max, lat_max)]
for loc in combo:
f = DataComposer._get_lola_cylindrical_url_from_pds(pds_file_resolution, loc)
if f not in filelist:
filelist.append(f)
return filelist, pds_file_resolution
[docs]
@staticmethod
def get_lola_polar_files_from_pds(resolution: FloatLike, lat_range: PairOfFloats) -> tuple[list[str], int]:
"""
Retrieve the appropriate polar projected LOLA DEM file url for a given location and resolution from the PDS.
Parameters
----------
resolution : FloatLike
Requested resolution in meters per pixel. The closest available resolution will be used.
lat_range : tuple of float
The (min_lat, max_lat) in degrees of the local region.
"""
import rasterio
src_url = "https://pds-geosciences.wustl.edu/lro/lro-l-lola-3-rdr-v1/lrolol_1xxx/data/lola_gdr/polar/float_img/"
AVAILABLE_RESOLUTIONS = [400, 240, 200, 120, 100, 80, 60, 40, 30, 20, 10, 5]
MIN_LAT = [45, 60, 45, 60, 45, 80, 60, 80, 75, 80, 85, 87.5]
if lat_range[0] > 0:
pole = "n"
min_lat_index = 0 # in the northern hemisphere, the minimum latitude defines coverage
else:
pole = "s"
min_lat_index = 1 # in the southern hemisphere, the maximum latitude defines coverage
combo = [
(res, minlat)
for res, minlat in zip(AVAILABLE_RESOLUTIONS, MIN_LAT, strict=True)
if abs(lat_range[min_lat_index]) >= minlat
]
if len(combo) == 0:
raise ValueError("No available LOLA polar DEM files cover the requested latitude range.")
valid_resolutions, valid_min_lat = zip(*combo, strict=True)
diffs = [abs(resolution - res) for res in valid_resolutions]
pds_file_resolution = valid_resolutions[np.argmin(diffs)]
pds_lat_min = valid_min_lat[np.argmin(diffs)]
filename = f"ldem_{pds_lat_min:d}{pole}_{pds_file_resolution:d}m"
url = f"{src_url}{filename}_float.xml"
return [url], pds_file_resolution
[docs]
@staticmethod
def get_lola_dem_file_list(
pix: FloatLike,
lat_range: PairOfFloats,
lon_range: PairOfFloats,
) -> tuple[list[str], int]:
"""
Determine the set of LOLA DEM files needed to cover the local region.
Parameters
----------
pix : FloatLike
The approximate resolution in meters per pixel to use to select the DEM files.
lat_range : PairOfFloats, optional
The (min_lat, max_lat) in degrees of the local region.
lon_range : PairOfFloats, optional
The (min_lon, max_lon) in degrees of the local region.
"""
target_pds_resolution = np.pi / 180.0 * 1737.53e3 / pix # The moon's radius
if target_pds_resolution > 10 and (
lat_range[0] > 60 or lat_range[1] < -60
): # Use polar files high latitude, high resolution regions.
return DataComposer.get_lola_polar_files_from_pds(pix, lat_range=lat_range)
else: # Use cylindrical for all other cases
return DataComposer.get_lola_cylindrical_files_from_pds(
resolution=target_pds_resolution, lat_range=lat_range, lon_range=lon_range
)
[docs]
def populate_with_lola_data(self, pix: FloatLike | None = None):
"""
Loads the DEM files from LOLA needed to cover the surface.
Parameters
----------
pix : FloatLike
The approximate resolution in meters per pixel to use to select the DEM files.
Defaults to the resolution of the surface used.
"""
if pix is None:
pix = self._localsurface.pix
if self._localsurface.is_local:
lon_min, lon_max, lat_min, lat_max = self._localsurface.get_location_extents()
self.add_data(DataComposer.get_lola_dem_file_list(pix, lat_range=(lat_min, lat_max), lon_range=(lon_min, lon_max))[0])
else:
self.add_data(DataComposer.get_lola_dem_file_list(pix, lat_range=(-60, 60), lon_range=(-180, 180))[0])
self.add_data(DataComposer.get_lola_polar_files_from_pds(pix, lat_range=(60, -60))[0])
self.add_data(DataComposer.get_lola_polar_files_from_pds(pix, lat_range=(-60, 60))[0])
@property
def localsurface(self) -> LocalSurface:
"""
The ``LocalSurface`` used to create the ``DataComposer``.
"""
return self._localsurface
@property
def surface(self) -> Surface:
"""
The ``Surface`` used to create the ``DataComposer``.
"""
return self._localsurface.surface
@property
def finished(self):
"""
Whether ``finish()`` or ``cancel()`` has been called or not.
"""
return self._finished
import_components(__name__, __path__)