from __future__ import annotations
from abc import abstractmethod
from collections.abc import Sequence
from pathlib import Path
from typing import Any, Literal
import numpy as np
from numpy.random import Generator
from numpy.typing import ArrayLike, NDArray
from scipy.optimize import root_scalar
from cratermaker.components.crater import Crater
from cratermaker.components.target import Target
from cratermaker.constants import _DSTD, FloatLike, PairOfFloats
from cratermaker.core.base import ComponentBase, import_components
from cratermaker.utils import montecarlo_utils as mc
from cratermaker.utils.general_utils import format_large_units, parameter
from cratermaker.utils.montecarlo_utils import bounded_norm
[docs]
class Production(ComponentBase):
"""
The base class for computing the production function for craters and projectiles.
"""
_registry: dict[str, Production] = {}
def __init__(
self,
quasimc_file: str | Path | None = None,
quasimc_craters: list[Crater] | None = None,
diameter_range: PairOfFloats | None = None,
N_conversion_factor: float | None = None,
D_conversion_factor: float | None = None,
rng: Generator | None = None,
rng_seed: int | None = None,
rng_state: dict | None = None,
**kwargs: Any,
):
"""
**Warning:** This object should not be instantiated directly. Instead, use the ``.maker()`` method.
Parameters
----------
quasimc_file : str | Path, optional
Path to a file (CSV or NetCDF) containing the parameters used for craters emplaced using the quasi-Monte Carlo method. This file should contain at a minimum the diameter (or radius) of each crater, its location (lon, lat), and one of either production_time or production_D and production_N (D in km, N in units given by the N_conversion_factor attribute).
quasimc_craters : list[Crater], optional
A list of Crater objects that are emplaced using the quasi-Monte Carlo method. This is an alternative to providing a quasimc_file. Only one of either quasimc_file or quasimc_craters should be provided.
diameter_range : PairOfFloats, optional
The minimum and maximum crater diameter to sample from in meters. If not provided, the default is (0, inf), unless quasimc_file or quasimc_craters is provided, in which case the upper range will be set based on the smallest diameter in the quasi-Monte Carlo data.
N_conversion_factor : float, optional
The conversion factor to apply to the N values in the quasi-Monte Carlo data to convert them to units of number per m². The default is 1e12, which represents number of craters per 10⁶ km²
D_conversion_factor : float, optional
The conversion factor to apply to the D values in the quasi-Monte Carlo data to convert them to units of meters. The default is 1e3, which represents diameters in km.
rng : numpy.random.Generator | None
|rng|
rng_seed : Any type allowed by the rng_seed argument of numpy.random.Generator, optional
|rng_seed|
rng_state : dict, optional
|rng_state|
**kwargs : Any
|kwargs|
"""
super().__init__(rng=rng, rng_seed=rng_seed, rng_state=rng_state, **kwargs)
object.__setattr__(self, "_valid_generator_types", ["crater", "projectile"])
object.__setattr__(self, "_quasimc_file", None)
object.__setattr__(self, "_quasimc_craters", [])
object.__setattr__(self, "_N_conversion_factor", None)
object.__setattr__(self, "_D_conversion_factor", None)
object.__setattr__(self, "_diameter_range", None)
object.__setattr__(self, "_Crater", None)
self.diameter_range = diameter_range
if quasimc_file is not None and quasimc_craters is not None:
raise ValueError("Cannot provide both quasimc_file and quasimc_craters")
if quasimc_file is not None:
self.quasimc_file = quasimc_file
if quasimc_craters is not None:
self.quasimc_craters += quasimc_craters
self.N_conversion_factor = N_conversion_factor
self.D_conversion_factor = D_conversion_factor
def __str__(self) -> str:
str_repr = super().__str__()
str_repr += f"Generator type {self.generator_type}\n"
str_repr += f"Diameter range: {format_large_units(self.diameter_range[0], quantity='length')} to {format_large_units(self.diameter_range[1], quantity='length')}\n"
str_repr += f"N(D) units: {self.N_D_units}\n"
if self.quasimc_file is not None:
str_repr += "Quasi-Monte Carlo file: {self.quasimc_file}\n"
if self.quasimc_craters is not None:
str_repr += f"Number of quasi-Monte Carlo craters: {len(self.quasimc_craters)}\n"
return str_repr
[docs]
@classmethod
def maker(
cls,
production: str | Production | None = None,
target: Target | str | None = None,
quasimc_file: str | Path | None = None,
quasimc_craters: list[Crater] | None = None,
diameter_range: PairOfFloats | None = None,
N_conversion_factor: float | None = None,
D_conversion_factor: float | None = None,
rng: Generator | None = None,
rng_seed: int | None = None,
rng_state: dict | None = None,
**kwargs: Any,
) -> Production:
"""
Initialize a Production model with the given name or instance.
Parameters
----------
production : str | Production | None, optional
The production model to use. This can be either a string or a Production instance.
If None, the default production model is "neukum" and the version is based on the target (if provided), either Moon, Mars, or Projectile for all other bodies. Default is "Moon"
target : Target | str | None, optional
The target body for the impact. Can be a Target object or a string representing the target name.
quasimc_file : str | Path, optional
Path to a file (CSV or NetCDF) containing the parameters used for craters emplaced using the quasi-Monte Carlo method. This file should contain at a minimum the diameter (or radius) of each crater, its location (lon, lat), and one of either production_time or production_D and production_N (D in km, N in units given by the N_conversion_factor attribute).
quasimc_craters : list[Crater], optional
A list of Crater objects that are emplaced using the quasi-Monte Carlo method. This is an alternative to providing a quasimc_file. Only one of either quasimc_file or quasimc_craters should be provided.
diameter_range : PairOfFloats, optional
The minimum and maximum crater diameter to sample from in meters. If not provided, the default is (0, inf), unless quasimc_file or quasimc_craters is provided, in which case the upper range will be set based on the smallest diameter in the quasi-Monte Carlo data.
N_conversion_factor : float, optional
The conversion factor to apply to the N values in the quasi-Monte Carlo data to convert them to units of number per m². The default is 1e12, which represents number of craters per 10⁶ km²
D_conversion_factor : float, optional
The conversion factor to apply to the D values in the quasi-Monte Carlo data to convert them to units of meters. The default is 1e3, which represents diameters in km.
rng : numpy.random.Generator | None
|rng|
rng_seed : Any type allowed by the rng_seed argument of numpy.random.Generator, optional
|rng_seed|
rng_state : dict, optional
|rng_state|
**kwargs : Any
|kwargs|
Returns
-------
Production
An instance of the specified production model.
Raises
------
KeyError
If the specified production model name is not found in the registry.
TypeError
If the specified production model is not a string or a subclass of Production.
ValueError
If there is an error initializing the production model.
"""
set_version = False
version = None
if production is None:
set_version = True
elif isinstance(production, str):
production = production.lower()
if production == "neukum":
set_version = True
if set_version:
version = kwargs.pop("version", None)
if target is None and version is not None and version in ["Moon", "Mars"]:
target = version
target = Target.maker(target, **kwargs)
if target.name in ["Mercury", "Venus", "Earth", "Moon", "Mars"]:
production = "neukum"
if version is None or version != "projectile":
if target.name in ["Moon", "Mars"]:
version = target.name
else:
version = "projectile"
else:
production = "powerlaw"
return super().maker(
component=production,
version=version,
target=target,
quasimc_file=quasimc_file,
quasimc_craters=quasimc_craters,
diameter_range=diameter_range,
N_conversion_factor=N_conversion_factor,
D_conversion_factor=D_conversion_factor,
rng=rng,
rng_seed=rng_seed,
rng_state=rng_state,
**kwargs,
)
[docs]
def sample(
self,
time_start: FloatLike | None = None,
time_end: FloatLike | None = None,
diameter_number: PairOfFloats | None = None,
diameter_number_end: PairOfFloats | None = None,
diameter_range: PairOfFloats | None = None,
area: FloatLike | None = None,
compute_time: bool = True,
**kwargs: Any,
) -> np.ndarray:
"""
Sample diameters and ages from the production function.
This function can either sample from a given age range or
from a given cumulative number/diameter pair (but not both).
Parameters
----------
time_start : FloatLike, optional
Age in the past in units of My relative to the present, which is used compute the cumulative SFD.
time_end, FloatLike, optional
The ending age in units of My relative to the present, which is used to compute the cumulative SFD. The default is 0 (present day).
diameter_number : PairOfFloats, optional
A pair of diameter and cumulative number values, in the form of a (D, N). If provided, the function convert this value
to a corresponding age and use the production function for a given age.
diameter_number_end : PairOfFloats, optional
A pair of diameter and cumulative number values, in the form of a (D, N). If provided, the function will convert this
value to a corresponding reference age and use the production function for a given age.
diameter_range : PairOfFloats, optional
The minimum and maximum crater diameter to sample from in meters. If not provided, the :py:attr:`~cratermaker.components.production.Production.diameter_range` value will be used.
area : FloatLike, optional
The area in m² over which the production function is evaluated to generate the expected number, which is the production
function over the input age/cumulative number range at the minimum diameter.
compute_time : bool, optional
If True, the function will return the sampled ages in addition to the diameters. The default is True.
**kwargs: Any
Any additional keywords that are passed to |production.function|.
Returns
-------
numpy array
The sampled diameter values
numpy array or None
The sampled age values if compute_time is True, otherwise None.
"""
arguments = {
"time_start": time_start,
"time_end": time_end,
"diameter_number": diameter_number,
"diameter_number_end": diameter_number_end,
"diameter_range": diameter_range,
"area": area,
"compute_time": compute_time,
**kwargs,
}
arguments = self._validate_sample_args(**arguments)
time_start = arguments["time_start"]
time_end = arguments["time_end"]
diameter_range = arguments["diameter_range"]
area = arguments["area"]
compute_time = arguments["compute_time"]
validate_inputs = kwargs.pop("validate_inputs", False)
# Build the cumulative distribution function from which we will sample
input_diameters = np.logspace(np.log10(diameter_range[0]), np.log10(diameter_range[1]))
cdf = self.function(
diameter=input_diameters,
time_start=time_start,
time_end=time_end,
validate_inputs=validate_inputs,
**kwargs,
)
if cdf.ndim > 1:
cdf = cdf[:, 0]
expected_num = cdf[0] * area if area is not None else None
diameters = mc.get_random_size(
diameters=input_diameters,
cdf=cdf,
mu=expected_num,
**vars(self.common_args),
)
if diameters.size == 0:
return np.empty(0), np.empty(0)
if compute_time:
timevals = self.compute_time(diameters=diameters, time_start=time_start, time_end=time_end)
else:
timevals = np.empty(0)
return diameters, timevals
[docs]
def compute_time(self, diameters: ArrayLike, time_start: float, time_end: float = 0.0, **kwargs: Any) -> np.ndarray:
"""
Compute the ages of craters with given diameters based on the production function.
Parameters
----------
diameters : ArrayLike
The diameters of the craters in meters.
time_start : float
The starting age in My relative to the present, which is used to compute the cumulative SFD.
time_end : float, optional
The ending age in My relative to the present, which is used to compute the cumulative SFD. The default is 0 (present day).
**kwargs: Any
Any additional keywords that are passed to |production.function|.
Returns
-------
numpy array
The computed ages for the given diameters, sorted in order of decreasing age.
"""
time_subinterval = np.linspace(time_end, time_start, num=1000)
diameters = np.atleast_1d(diameters)
if len(diameters) == 0:
return []
N_vs_age = self.function(
diameter=diameters,
time_start=time_subinterval,
validate_inputs=False,
**kwargs,
)
# Normalize the weights for each diameter
if N_vs_age.ndim > 1:
N_vs_age -= np.min(N_vs_age, axis=1)[:, None] # Subtract the min along age dimension
N_vs_age = N_vs_age[:, :-1] # Remove the last value to avoid the cumulative sum reaching 1
weights_sum = np.sum(N_vs_age, axis=1)[:, None] # Sum of weights for each diameter
weights = N_vs_age / weights_sum # Normalize weights for each diameter
# Compute the CDF for each diameter
cdf = np.cumsum(weights, axis=1)
else:
if len(N_vs_age) > 0:
N_vs_age -= np.min(N_vs_age) # Subtract the min along age dimension
N_vs_age = N_vs_age[:-1] # Remove the last value to avoid the cumulative sum reaching 1
weights_sum = np.sum(N_vs_age) # Sum of weights for each diameter
weights = N_vs_age / weights_sum # Normalize weights for each diameter
# Compute the CDF for each diameter
cdf = np.cumsum(weights)
else:
cdf = np.empty(0)
# Generate uniform random numbers for each diameter
random_values = self.rng.uniform(0, 1, size=diameters.size)
if N_vs_age.ndim > 1:
chosen_subinterval_indices = np.zeros(diameters.size, dtype=int)
# For each diameter, find the corresponding index in the CDF
for i, cdf_row in enumerate(cdf):
# Find the corresponding index in the CDF for each diameter
chosen_subinterval_indices[i] = np.searchsorted(cdf_row, random_values[i], side="right") - 1
else:
chosen_subinterval_indices = np.searchsorted(cdf, random_values[:], side="right") - 1
# Ensure indices are within valid range
chosen_subinterval_indices = np.clip(chosen_subinterval_indices, 0, len(time_subinterval) - 2)
# Sample a random age within the selected subinterval for each diameter
time_lo = time_subinterval[chosen_subinterval_indices]
time_hi = time_subinterval[chosen_subinterval_indices + 1]
timevals = self.rng.uniform(low=time_lo, high=time_hi)
# Sort the ages and diameters so that they are in order of decreasing age
if timevals.size > 1:
sort_indices = np.argsort(timevals)[::-1]
diameters = diameters[sort_indices]
timevals = timevals[sort_indices]
return timevals
[docs]
@abstractmethod
def function(
self,
diameter: FloatLike | Sequence[FloatLike] | ArrayLike = 1.0,
time_start: FloatLike | Sequence[FloatLike] | ArrayLike = 1.0,
time_end: FloatLike | Sequence[FloatLike] | ArrayLike | None = None,
**kwargs: Any,
) -> NDArray[np.float64]: ...
[docs]
def age_from_D_N(
self,
diameter: FloatLike | Sequence[FloatLike] | ArrayLike,
cumulative_number_density: FloatLike | Sequence[FloatLike] | ArrayLike,
validate_inputs: bool = True,
**kwargs: Any,
) -> FloatLike | ArrayLike:
"""
Return the age in My for a given number density of craters and diameter.
Parameters
----------
diameter : float-like or array-like
diameter of the crater in m
cumulative_number_density : float-like or array-like
number density of craters per m² surface area greater than the input diameter
validate_inputs : bool, optional
If True, the function will validate the inputs. The default is True.
**kwargs: Any
Any additional keywords that are passed to |production.function|.
Returns
-------
float_like or numpy array
The age in My for the given relative number density of craters.
"""
if validate_inputs:
diameter, cumulative_number_density = self._validate_csfd(
diameter=diameter, cumulative_number_density=cumulative_number_density
)
def _root_func(t, d, n):
kwargs.pop("validate_inputs", None)
retval = self.function(diameter=d, time_start=t, validate_inputs=False, **kwargs) - n
return retval
xtol = 1e-8
x0 = 4000.0
if self.valid_time[0] is not None:
xlo = self.valid_time[0]
else:
xlo = 0.0
if self.valid_time[1] is not None:
xhi = self.valid_time[1]
else:
xhi = 1e6
retval = []
darr = np.array(diameter)
narr = np.array(cumulative_number_density)
converged = []
flag = []
for i, d in np.ndenumerate(darr):
dval = float(d.item())
nval = float(narr[i].item())
sol = root_scalar(
lambda t, d=dval, n_i=nval: _root_func(t, d, n_i),
x0=x0,
xtol=xtol,
method="brentq",
bracket=[xlo, xhi],
)
retval.append(sol.root)
converged.append(sol.converged)
flag.append(sol.flag)
retval = np.array(retval)
converged = np.array(converged)
flag = np.array(flag)
if np.all(converged):
return retval.item() if np.isscalar(diameter) else retval
else:
raise ValueError(
f"The root finding algorithm did not converge for all values of diameter and cumulative_number. Flag {flag}"
)
[docs]
def D_from_N_age(
self,
cumulative_number_density: FloatLike | Sequence[FloatLike] | ArrayLike,
time_start: FloatLike | Sequence[FloatLike] | ArrayLike,
time_end: FloatLike | Sequence[FloatLike] | ArrayLike | None = None,
validate_inputs: bool = True,
**kwargs: Any,
) -> FloatLike | ArrayLike:
"""
Return the diameter for a given number density of craters and age.
Parameters
----------
cumulative_number_density : float-like or array-like
number density of craters per m² surface area greater than the input diameter
time_start : FloatLike or ArrayLike, default=1.0
Age in the past in units of My relative to the present, which is used compute the cumulative SFD.
time_end, FloatLike or ArrayLike, optional
The ending age in units of My relative to the present, which is used to compute the cumulative SFD. The default is 0 (present day).
validate_inputs : bool, optional
If True, the function will validate the inputs. The default is True.
**kwargs: Any
Any additional keywords that are passed to |production.function|.
Returns
-------
float_like or numpy array
The diameter for the given number density and age.
"""
if "age" in kwargs:
time_start = kwargs.pop("age")
if validate_inputs:
time_start, time_end = self._validate_age(time_start, time_end)
def _root_func(d, n, time_start, time_end):
kwargs.pop("validate_inputs", None)
retval = (
self.function(
diameter=d,
time_start=time_start,
time_end=time_end,
validate_inputs=False,
**kwargs,
)
- n
)
return retval
xtol = 1e-8
x0 = 1.0 # Initial guess for the diameter
xlo = 1e-6
xhi = 1e6
retval = []
narr = np.array(cumulative_number_density)
converged = []
flag = []
for _, n in np.ndenumerate(narr):
nval = float(n.item())
sol = root_scalar(
lambda d, n=nval, time_start=time_start, time_end=time_end: _root_func(d, n, time_start, time_end),
x0=x0,
xtol=xtol,
method="brentq",
bracket=[xlo, xhi],
)
retval.append(sol.root)
converged.append(sol.converged)
flag.append(sol.flag)
retval = np.array(retval)
converged = np.array(converged)
flag = np.array(flag)
if np.all(converged):
return retval.item() if np.isscalar(cumulative_number_density) else retval
else:
raise ValueError(
f"The root finding algorithm did not converge for all values of cumulative_number and age. Flag {flag}"
)
[docs]
@abstractmethod
def chronology(
self,
time_start: FloatLike | Sequence[FloatLike] | ArrayLike = 1.0,
time_end: FloatLike | Sequence[FloatLike] | ArrayLike = 0.0,
validate_inputs: bool = True,
**kwargs: Any,
) -> FloatLike | ArrayLike: ...
[docs]
@abstractmethod
def csfd(self, diameter: FloatLike | ArrayLike, **kwargs: Any) -> FloatLike | ArrayLike: ...
def _validate_csfd(
self,
diameter: FloatLike | Sequence[FloatLike] | ArrayLike | None = None,
cumulative_number_density: FloatLike | Sequence[FloatLike] | ArrayLike | None = None,
) -> tuple[FloatLike | ArrayLike, FloatLike | ArrayLike]:
"""
Validates the diameter and cumulative_number arguments.
Both arguments can be either scalar or array-like, but they must be both scalars or both arrays of the same length. Values must be non-negative.
Parameters
----------
diameter : float-like or array-like, optional
Diameter of the impacts
cumulative_number_density: float-like or array-like, optional
Cumulative number of impacts larger than `diameter`
Raises
------
ValueError
If any of the conditions on the CSFD are not met.
"""
if diameter is None and cumulative_number_density is None:
raise ValueError("Either the 'diameter' or 'cumulative_number_density' must be provided")
# Convert inputs to numpy arrays for uniform processing
if diameter is not None:
diameter_array = np.atleast_1d(diameter)
else:
diameter_array = None
if cumulative_number_density is not None:
cumulative_number_density_array = np.atleast_1d(cumulative_number_density)
else:
cumulative_number_density_array = None
# Check if both are provided, they should be of the same length
if diameter_array is not None and cumulative_number_density_array is not None:
# Check for length consistency
if len(diameter_array) != len(cumulative_number_density_array):
raise ValueError("The 'diameter' and 'cumulative_number_density' must have the same length when both are provided")
# Validate non-negative values
if diameter_array is not None and np.any(diameter_array < 0):
raise ValueError("All values in 'diameter' must be non-negative")
if cumulative_number_density_array is not None and np.any(cumulative_number_density_array < 0):
raise ValueError("All values in 'cumulative_number_density' must be non-negative")
if diameter is not None and not np.isscalar(diameter):
diameter = diameter_array
if cumulative_number_density is not None and not np.isscalar(cumulative_number_density):
cumulative_number_density = cumulative_number_density_array
return diameter, cumulative_number_density
def _validate_sample_args(self, **kwargs: dict) -> dict:
"""
Validate all the input arguments to the sample method.
This function will raise a ValueError if any of the arguments are invalid. It will also convert age arguments to diameter_number and vice versa.
Parameters
----------
time_start : FloatLike, optional
The starting time in units of My relative to the present, which is used to compute the cumulative SFD.
age : FloatLike, optional
The starting time in units of My relative to the present, which is used to compute the cumulative SFD (a substitute for time_start).
time_end, FloatLike, optional
The ending time in units of My relative to the present, which is used to compute the cumulative SFD.
The default is 0 (present day).
diameter_number : PairOfFloats, optional
A pair of diameter and cumulative number values, in the form of a (D, N), which gives the total cumulative number of
projectiles, N, larger than diameter, D. If provided, the function convert this value to a corresponding age and use the
production function for a given age.
diameter_number_end : PairOfFloats, optional
A pair of diameter and cumulative number values, in the form of a (D, N), which gives the total cumulative number of
projectiles, N, larger than diameter, D.. If provided, the function will convert this value to a corresponding time_end
and use the production function for a given age. The default is (1000.0, 0) (present day).
diameter_range : PairOfFloats
The minimum and maximum crater diameter to sample from in meters. If not provided, the :py:attr:`~cratermaker.components.production.Production.diameter_range`
N_D : PairOfFloats
A pair of D, N values that represent the N(D) format for the cumulative number density at the start of the sample
N_D_end : PairOfFloats
A pair of D, N values that represent the N(D) format for the cumulative number density at the end of the sample.
area : FloatLike, optional
The area in m² over which the production function is evaluated to generate the expected number, which is the production
function over the input age/cumulative number range at the minimum diameter.
compute_time : bool, optional
If True, the function will return the sampled ages in addition to the diameters. The default is True.
Returns
-------
FloatLike
The start time in units of My relative to the present.
FloatLike
The end time in units of My relative to the present.
PairOfFloats
A pair of diameter and cumulative number values, in the form of a (D, N) representing the cumulative number and diameter values for a surface at the age given by the start time
PairOfFloats
A pair of diameter and cumulative number values, in the form of a (D, N) representing the cumulative number and diameter values for a surface at the age given by the end time.
PairOfFloats
The minimum and maximum diameter values to sample from in meters.
FloatLike
The area in m² over which the production function is evaluated to generate the expected number, which is the production
function over the input time/cumulative number range at the minimum diameter.
Raises
------
ValueError
If any of the following conditions are met:
- Neither the age nore the diameter_number argument is provided.
- Both the age and diameter_number arguments are provided.
- Both the time_end and diameter_number_end arguments are provided.
- The time_start or age argument is provided but is not a scalar.
- The time_end argument is provided but is not a scalar.
- The diameter_number argument is not a pair of values, or any of them are less than 0
- The diameter_number_end argument is not a pair of values, or any of them are less than 0
- The diameter_range is not a pair of values.
- The minimum diameter is less than or equal to 0.
- The maximum diameter is less than or equal the minimum.
- The area argument is not a scalar or is less than 0.
"""
_REF_DIAM = 1000.0 # The diameter value used if age arguments are provided
age = kwargs.pop("age", None)
if age is not None:
kwargs["time_start"] = age
time_start = kwargs.get("time_start")
time_end = kwargs.get("time_end")
N_D = kwargs.pop("N_D", None)
N_D_end = kwargs.pop("N_D_end", None)
if N_D is not None:
if time_start is not None:
raise ValueError("Cannot provide both N_D and time_start")
time_start = self.age_from_D_N(
diameter=N_D[0] * self.D_conversion_factor, cumulative_number_density=N_D[1] * self.N_conversion_factor
)
if N_D_end is not None:
if time_end is not None:
raise ValueError("Cannot provide both N_D_end and time_end")
time_end = self.age_from_D_N(
diameter=N_D_end[0] * self.D_conversion_factor, cumulative_number_density=N_D_end[1] * self.N_conversion_factor
)
diameter_number = kwargs.get("diameter_number")
diameter_number_end = kwargs.get("diameter_number_end")
diameter_range = kwargs.pop("diameter_range", self.diameter_range)
area = kwargs.get("area")
compute_time = kwargs.get("compute_time", True)
if time_start is None and diameter_number is None:
raise ValueError("Either the 'time_start' or 'diameter_number' must be provided")
elif time_start is not None and diameter_number is not None:
raise ValueError("Only one of the 'time_start' or 'diameter_number' arguments can be provided")
if time_end is not None and diameter_number_end is not None:
raise ValueError("Only one of the 'time_end' or 'diameter_number_end' arguments can be provided")
if time_start is not None and not np.isscalar(time_start):
raise ValueError("The 'time_start' must be a scalar")
if time_end is not None and not np.isscalar(time_end):
raise ValueError("The 'time_end' must be a scalar")
if len(diameter_range) != 2:
raise ValueError("The 'diameter_range' must be a pair of values")
if diameter_range[0] <= 0:
raise ValueError(f"Diameter range invalid: {diameter_range}. The minimum diameter must be greater than 0")
if diameter_range[1] < diameter_range[0]:
raise ValueError(
f"Diameter range invalid: {diameter_range}. The maximum diameter must be greater than or equal to the minimum diameter"
)
if area is None:
area = 1.0
else:
if not np.isscalar(area):
raise ValueError("The 'area' must be a scalar")
if area < 0.0:
raise ValueError("The 'area' must be greater than 0")
if diameter_number is not None:
if len(diameter_number) != 2:
raise ValueError("The 'diameter_number' must be a pair of values in the form (D, N)")
diameter_number = self._validate_csfd(*diameter_number)
diameter_number_density = (diameter_number[0], diameter_number[1] / area)
time_start = self.age_from_D_N(*diameter_number_density)
else:
diameter_number_density = (
_REF_DIAM,
self.function(diameter=_REF_DIAM, time_start=time_start),
)
diameter_number = (
diameter_number_density[0],
diameter_number_density[1] * area,
)
if time_end is None and diameter_number_end is None:
diameter_number_end = (_REF_DIAM, 0.0)
diameter_number_density_end = diameter_number_end
time_end = 0.0
if diameter_number_end is not None:
if len(diameter_number_end) != 2:
raise ValueError("The 'diameter_number_end' must be a pair of values in the form (D,N)")
diameter_number_end = self._validate_csfd(*diameter_number_end)
diameter_number_density_end = (
diameter_number_end[0],
diameter_number_end[1] / area,
)
# Check to be sure that the diameter in the diameter_number_end is the same as diameter_number.
# If not, we need to adjust the end diameter value it so that they match
if diameter_number is not None:
if diameter_number_density_end[0] != diameter_number_density[0]:
diameter_number_density_end = (
diameter_number_density[0],
self.function(diameter=diameter_number_density[0], time_start=time_end),
)
diameter_number_end = (
diameter_number[0],
diameter_number_density_end[1] * area,
)
if time_end is None:
diameter_number_end = self._validate_csfd(*diameter_number_end)
diameter_number_density_end = (
diameter_number_end[0],
diameter_number_end[1] / area,
)
time_end = self.age_from_D_N(*diameter_number_density_end)
time_start, time_end = self._validate_age(time_start, time_end)
if diameter_number_end is None:
diameter_number_end = (
diameter_number[0],
self.function(diameter=diameter_number[0], time_start=time_end) * area,
)
if not isinstance(compute_time, bool):
raise TypeError("The 'compute_time' argument must be a boolean")
kwargs["time_start"] = time_start
kwargs["time_end"] = time_end
kwargs["diameter_number"] = diameter_number
kwargs["diameter_number_end"] = diameter_number_end
kwargs["diameter_range"] = diameter_range
kwargs["area"] = area
kwargs["compute_time"] = compute_time
return kwargs
def _validate_age(
self,
time_start: FloatLike | Sequence[FloatLike] | ArrayLike = 1.0,
time_end: FloatLike | Sequence[FloatLike] | ArrayLike | None = None,
) -> FloatLike | ArrayLike:
"""
Processes the time_start argument and time_end arguments. Checks that they are valid and returns a tuple of age and time_end.
Parameters
----------
time_start : FloatLike or ArrayLike, default=1.0
Age in the past in units of My relative to the present, which is used compute the cumulative SFD.
time_end, FloatLike or ArrayLike, optional
The reference used when computing age in My. If none is passed, it will be set either 0 or an array of zeros, depending on
the size of time_start.
Returns
-------
tuple of float
The start and end ages in units of My.
Raises
------
ValueError
If the the start age is greater than the end age or the age variable is not a scalar or a sequence of 2 values.
"""
if np.isscalar(time_start):
if not isinstance(time_start, FloatLike):
raise TypeError("time_start must be a numeric value (float or int)")
if time_end is None:
time_end = 0.0
elif not np.isscalar(time_end) or not isinstance(time_end, FloatLike):
raise TypeError("time_end must be a numeric value (float or int)")
if time_start < time_end:
raise ValueError("time_start must be greater than the time_end")
if self.valid_time[0] is not None and time_start < self.valid_time[0]:
raise ValueError(
f"time_start must be greater than the minimum valid time {self.valid_time[0]} (it is units of My before present)"
)
if self.valid_time[1] is not None and time_start > self.valid_time[1]:
raise ValueError(
f"time_start must be less than the maximum valid time {self.valid_time[1]} (it is in units of My before present)"
)
else:
if isinstance(time_start, (list, tuple)):
time_start = np.array(time_start, dtype=np.float64)
elif not isinstance(time_start, np.ndarray):
raise TypeError("time_start must be a numeric value (float or int) or an array")
if time_end is None:
time_end = np.zeros_like(time_start, dtype=np.float64)
elif np.isscalar(time_end):
time_end = np.full_like(time_start, time_end, dtype=np.float64)
elif isinstance(time_end, (list, tuple)):
time_end = np.array(time_end, dtype=np.float64)
elif not isinstance(time_end, np.ndarray):
raise TypeError("time_end must be a numeric value (float or int) or an array")
if time_start.size != time_end.size:
raise ValueError("The 'time_start' and 'time_end' arguments must be the same size if both are provided")
if np.any(time_start < time_end):
raise ValueError("time_start must be greater than the time_end")
if self.valid_time[0] is not None:
if np.any(time_start < self.valid_time[0]) or np.any(time_end < self.valid_time[0]):
raise ValueError(
f"time_start must be greater than the minimum valid age {self.valid_time[0]} (it is in units of My before present)"
)
if self.valid_time[1] is not None:
if np.any(time_start > self.valid_time[1]) or np.any(time_end > self.valid_time[1]):
raise ValueError(
f"time_start must be less than the maximum valid age {self.valid_time[1]} (it is in units of My before present)"
)
return time_start, time_end
[docs]
def quasimc_merge(
self,
craters: list[Crater],
time_start: float | None = None,
time_end: float | None = None,
N_D: PairOfFloats | None = None,
N_D_end: PairOfFloats | None = None,
**kwargs,
):
"""
Merge a randomly-generated list of craters with the quasi-Monte Carlo craters over a given time interval.
It will extract any craters from |production.quasimc_craters| that have time values that overlap those of the arguments. Then it will successively check the to see if any in the "craters" list are larger than the largest in
Parameters
----------
craters : list[Crater]
The list of Crater objects to merge with the quasi-Monte Carlo craters.
time_start : float, optional
The starting time in units of My relative to the present in which to extract the quasi-Monte Carlo craters.
time_end : float, optional
The ending time in units of My relative to the present in which to extract the quasi-Monte Carlo craters.
N_D : PairOfFloats, optional
A pair of D, N values that represent the N(D) format for the cumulative number density at the start of the sample, which is used to extract the quasi-Monte Carlo craters. The units given by |production.N_D_units| and can be adjusted by setting |production.D_conversion_factor| and |production.N_conversion_factor|.
N_D_end : PairOfFloats, optional
A pair of D, N values that represent the N(D) format for the cumulative number density at the end of the sample, which is used to extract the quasi-Monte Carlo craters. The units given by |production.N_D_units| and can be adjusted by setting |production.D_conversion_factor| and |production.N_conversion_factor|.
**kwargs: Any
|kwargs|
Returns
-------
list[Crater]
The merged list of Crater objects, sorted in order of decreasing age.
"""
if not isinstance(craters, (list, tuple)) or (len(craters) > 0 and any(not isinstance(c, Crater) for c in craters)):
raise ValueError("craters must be a list of Crater objects.")
arguments = {
"time_start": time_start,
"time_end": time_end,
"N_D": N_D,
"N_D_end": N_D_end,
**kwargs,
}
arguments = self._validate_sample_args(**arguments)
time_start = arguments["time_start"]
time_end = arguments["time_end"]
rnglist = []
if len(craters) > 0:
rnglist = [c for c in craters if c.time is not None and c.time <= time_start and c.time > time_end]
untimed_craters = [c for c in craters if c.time is None]
time_vals = self.compute_time(
diameters=np.array([c.diameter for c in untimed_craters]),
time_start=time_start,
time_end=time_end,
)
for c, t in zip(untimed_craters, time_vals, strict=True):
rnglist.append(self.Crater.maker(c, time=t))
rnglist.sort(key=lambda c: c.diameter, reverse=False)
if self.quasimc_craters is not None:
qmclist = [c for c in self.quasimc_craters if c.time <= time_start and c.time > time_end]
qmclist.sort(key=lambda c: c.diameter, reverse=False)
else:
return rnglist
merged = []
qidx = 0
nqmc = len(qmclist)
for c in rnglist:
if qidx < nqmc:
qmc = qmclist[qidx]
if c.diameter < qmc.diameter:
merged.append(c)
else:
merged.append(qmc)
qidx += 1
else:
merged.append(c)
if qidx < nqmc:
merged += qmclist[qidx:]
merged.sort(key=lambda c: c.time, reverse=True)
return merged
def _process_quasimc_craters(self, craters: list[Crater], **kwargs: Any) -> list[Crater]:
"""
Process a list of quasi-Monte Carlo craters to set their emplacement time using computing production_ND, production_time, and/or production_sequence if present, and will drop craters that have no production metadata.
Parameters
----------
craters : list[Crater]
The list of Crater objects to process.
Returns
-------
list[Crater]
The list of processed Crater objects with their emplacement time set, and any craters without production metadata dropped.
"""
def _draw_quasimc_time(
crater: Crater,
sequence_extents: dict[int, float] | None = None,
) -> float | None:
"""Draw one time sample for a crater, or None if no production metadata."""
if sequence_extents is not None:
s = crater.production_sequence
seq_nlo, seq_nhi = sequence_extents[s]
seq_nlo = max(seq_nlo, 0.0)
seq_nmean = (seq_nlo + seq_nhi) / 2
seq_nsig = (seq_nhi - seq_nlo) / 2
if seq_nsig / seq_nmean < 1e-12: # Prevents an error when the bounds are too tight
seq_nsig = 0.0
seq_tlo = self.age_from_D_N(diameter=_DSTD, cumulative_number_density=seq_nlo)
seq_thi = self.age_from_D_N(diameter=_DSTD, cumulative_number_density=seq_nhi)
seq_tmean = (seq_tlo + seq_thi) / 2
seq_tsig = (seq_thi - seq_tlo) / 2
if seq_tsig / seq_tmean < 1e-12: # Prevents an error when the bounds are too tight
seq_tsig = 0.0
else:
s = None
has_sequence = s is not None and sequence_extents is not None
if crater.production_ND is None and crater.production_time is None:
if not has_sequence:
raise ValueError(
"Craters without production_time or production_ND need a production_sequence and sequence_extents"
)
if seq_nsig > 0.0:
nval = bounded_norm(loc=seq_nmean, scale=seq_nsig, lower_bound=seq_nlo, upper_bound=seq_nhi, rng=self.rng)
else:
nval = seq_nmean
nval = max(nval, 0.0)
return float(self.age_from_D_N(diameter=_DSTD, cumulative_number_density=nval))
elif crater.production_ND is not None and crater.production_ND != [None, None, None]:
# N(D) values take precedence over time values
prod_diam, nmean, nsig = crater.production_ND
prod_diam *= 1e3
nmean /= self.N_conversion_factor
nsig /= self.N_conversion_factor
# Convert from N(prod_diam) to N(1) for alignment
fac = self.csfd(_DSTD) / self.csfd(prod_diam)
nmean *= fac
nsig *= fac
if has_sequence:
if nsig > 0 and seq_nsig > 0.0:
nval = bounded_norm(loc=nmean, scale=nsig, lower_bound=seq_nlo, upper_bound=seq_nhi, rng=self.rng)
else:
nval = max(min(nmean, seq_nhi), seq_nlo)
else:
nval = self.rng.normal(loc=nmean, scale=nsig) if nsig > 0 else nmean
nval = max(0.0, nval)
return float(self.age_from_D_N(diameter=_DSTD, cumulative_number_density=nval))
elif crater.production_time is not None and crater.production_time != [None, None]:
tmean, tsig = crater.production_time
if has_sequence:
if tsig > 0 and seq_tsig > 0.0:
t = bounded_norm(loc=tmean, scale=tsig, lower_bound=seq_tlo, upper_bound=seq_thi, rng=self.rng)
else:
t = max(min(tmean, seq_thi), seq_tlo)
else:
t = self.rng.normal(loc=tmean, scale=tsig) if tsig > 0 else tmean
if isinstance(t, np.ndarray):
t = t.item()
max(t, 0.0)
return float(t)
else:
return None
def _sequence_is_consistent(
times_by_idx: dict[int, float],
sequence_groups: dict[int, list[int]],
) -> bool:
"""
For increasing tag value, times must strictly decrease: sequence(a) < sequence(b) ==> time(a) > time(b).
"""
if any(t is None for t in times_by_idx.values()):
return False
ordered_sequence = sorted(sequence_groups.keys())[:-1]
for t_lo, t_hi in zip(ordered_sequence[:-1], ordered_sequence[1:], strict=True):
lo_min = min(
times_by_idx[i] for i in sequence_groups[t_lo]
) # all lower-sequence times must exceed higher-sequence times
hi_max = max(times_by_idx[i] for i in sequence_groups[t_hi])
if not (lo_min >= hi_max):
return False
return True
def _get_sequence_extents(valid_craters: list[Crater], sequence_groups: dict[int, list[int]]) -> dict[int, float]:
"""
Determine the N(1) extents of each sequence number from the provided production metadata.
Parameters
----------
valid_craters : list[Crater]
The list of Crater objects to process
sequence_groups : dict[int, list[int]]
A dictionary with each key being a sequence number and the values are lists of indices to the entries in the valid_craters list. It is assumed that the sequence_groups are ordered by key so that the first entry is the lowest sequence number
Returns
-------
sequence_extents : dict[int, float]
A dictionary with each key being a sequence number and values are tuples with the upper and lower N(1) extent of the associated sequence
"""
if len(sequence_groups) == 0:
return {}
# First go through the sequence groups and get their production time/N(D) metadata, converting time values to N(1) values
sequence_extents: dict[int, float] = {}
nvals = {}
nsigvals = {}
sequence_numbers = list(sequence_groups.keys())
lowest_sequence_number = sequence_numbers[0]
highest_sequence_number = sequence_numbers[-1]
for s, idxs in sequence_groups.items():
n = []
sig = []
for i in idxs:
crater = valid_craters[i]
if crater.production_ND is not None and crater.production_ND != [None, None, None]:
prod_diam, nmean, nstdev = crater.production_ND
prod_diam *= 1e3
nmean /= self.N_conversion_factor
nstdev /= self.N_conversion_factor
# Convert from N(prod_diam) to N(1) for alignment
fac = self.csfd(_DSTD) / self.csfd(prod_diam)
nmean *= fac
nstdev *= fac
n.append(nmean)
sig.append(nstdev)
elif crater.production_time is not None and crater.production_time != [None, None]:
# Time doesn't map linearly with N(1), but to keep things consistent here for computing sequence boundaries we will convert the upper/lower 1 sigma bounds of production_time into upper/lower 1 sigma bounds on N(1). Later, craters with production_time+/-production_time_stdev values will be drawn using time-based values
tmean, tstdev = crater.production_time
tlo = max(tmean - tstdev, 0.0)
thi = tmean + tstdev
nlo = self.function(diameter=_DSTD, time_start=tlo)
nhi = self.function(diameter=_DSTD, time_start=thi)
nmean = (nhi + nlo) / 2
nstdev = (nhi - nlo) / 2
n.append(nmean)
sig.append(nstdev)
if len(n) > 0:
sig = np.sqrt(np.sum([u**2 for u in sig])) / len(n)
n = np.mean(n)
nvals[s] = n
nsigvals[s] = sig
else:
# Verify that the first entry has valid production metadata, otherwise we have no way to anchor the sequences to values of N(1)
if s == lowest_sequence_number:
raise ValueError(
"At least one crater with the lowest production_sequence number must have either a production_time or production_ND value"
)
elif s == highest_sequence_number:
nvals[s] = 0.0
nsigvals[s] = 0.0
else:
# Hold off on calculating the empty slots until all of the sequence numbers with production metadata have been computed
nvals[s] = None
nsigvals[s] = 0.0
# Linearlly interpolate using the N(1) vs sequence_number value to compute the mean N(1) value for any that doen't have one
empties = [s for s in sequence_numbers if nvals[s] is None]
filled = [s for s in sequence_numbers if nvals[s] is not None]
for s in empties:
nvals[s] = np.interp(s, xp=filled, fp=[nvals[s] for s in filled])
# Iteratively compute upper and lower extents until there is no overlap between any sequences
for _ in range(10):
for i, s in enumerate(sequence_numbers):
n = nvals[s]
sig = nsigvals[s]
nlo = max(n - sig, 0.0)
nhi = n + sig
if s != lowest_sequence_number:
sprev = sequence_numbers[i - 1]
nloprev = max(nvals[sprev] - nsigvals[sprev], 0.0)
nhi += (nloprev - nhi) / 2
if s != highest_sequence_number:
snext = sequence_numbers[i + 1]
nhinext = nvals[snext] + nsigvals[snext]
nlo += (nhinext - nlo) / 2
nlo = max(nlo, 0.0)
if nlo > nhi:
tmp = nlo
nlo = nhi
nhi = tmp
sequence_extents[s] = (float(nlo), float(nhi))
# Ensure that all boundaries are consistent such that nlo of one sequences is the same value as nhi of the next
for i, s in enumerate(sequence_numbers):
nlo, nhi = sequence_extents[s]
if s != lowest_sequence_number:
sprev = sequence_numbers[i - 1]
nlo_prev, nhi_prev = sequence_extents[sprev]
if nhi != nlo_prev:
nhi = (nlo_prev + nhi) / 2
sequence_extents[sprev] = (nhi, nhi_prev)
sequence_extents[s] = (nlo, nhi)
if s != highest_sequence_number:
snext = sequence_numbers[i + 1]
nlo_next, nhi_next = sequence_extents[snext]
if nlo != nhi_next:
nlo = (nlo + nhi_next) / 2
sequence_extents[snext] = (nlo_next, nlo)
sequence_extents[s] = (nlo, nhi)
nvals = {k: (nlo + nhi) / 2 for k, (nlo, nhi) in sequence_extents.items()}
nsigvals = {k: (nhi - nlo) / 2 for k, (nlo, nhi) in sequence_extents.items()}
# Check that both nlo and nhi are monotonically decreasing with increasing sequence number and break out if they are
nlo_values = [v[0] for v in sequence_extents.values()]
nhi_values = [v[1] for v in sequence_extents.values()]
if all(x >= y for x, y in zip(nlo_values[:-1], nlo_values[1:], strict=True)) and all(
x >= y for x, y in zip(nhi_values[:-1], nhi_values[1:], strict=True)
):
break
return sequence_extents
# Keep only craters that can produce a time
valid_craters: list[Crater] = []
for c in craters:
if (
(c.production_ND is not None and c.production_ND != [None, None, None])
or (c.production_time is not None and c.production_time != [None, None])
or (c.production_sequence is not None)
):
valid_craters.append(c)
else:
print(
f"Skipping crater {c.name if c.name is not None else format_large_units(c.diameter, quantity='length')} because it has no production metadata"
)
# Partition craters between those tagged with a sequence number vs untagged
tagged_idx: list[int] = []
untagged_idx: list[int] = []
sequence_groups: dict[int, list[int]] = {}
# Group tagged craters
for i, c in enumerate(valid_craters):
sequence = c.production_sequence
if sequence is None:
untagged_idx.append(i)
else:
sequence = int(sequence)
tagged_idx.append(i)
sequence_groups.setdefault(sequence, []).append(i)
sequence_groups = dict(sorted(sequence_groups.items()))
sequence_extents = _get_sequence_extents(valid_craters, sequence_groups)
# Draw untagged once (unconstrained)
times_by_idx: dict[int, float] = {}
for i in untagged_idx:
times_by_idx[i] = _draw_quasimc_time(valid_craters[i])
for i in tagged_idx:
times_by_idx[i] = None
# Draw tagged iteratively until constraints are satisfied
if tagged_idx:
for i in tagged_idx:
times_by_idx[i] = _draw_quasimc_time(valid_craters[i], sequence_extents)
# Check for consistency, then repeat if necessary
if not _sequence_is_consistent(times_by_idx, sequence_groups):
raise RuntimeError("Could not satisfy production_sequence ordering constraints ")
# Build output crater list
processed: list[self.Crater] = []
smallest_diameter = self.diameter_range[1]
for i, c in enumerate(valid_craters):
t = times_by_idx.get(i)
if t is None:
continue
processed.append(self.Crater.maker(crater=c, time=t))
if self.generator_type == "crater":
smallest_diameter = min(smallest_diameter, c.diameter)
else:
smallest_diameter = min(smallest_diameter, c.projectile_diameter)
self.diameter_range = (self.diameter_range[0], smallest_diameter)
processed.sort(key=lambda c: c.time, reverse=True)
return processed
@parameter
def generator_type(self):
"""
The type of generator to use.
This can be either "crater" or "projectile". This determines the nature of the production function, differentiating between 'crater' and 'projectile' types, which affect the calculations and interpretations of the production function.
"""
return self._generator_type
@generator_type.setter
def generator_type(self, value):
if not value:
self._generator_type = self._valid_generator_types[0]
return
if not isinstance(value, str):
raise TypeError("generator_type must be a string")
if value.lower() not in self._valid_generator_types:
raise ValueError(f"Invalid generator_type {value}. Must be one of {self._valid_generator_types}")
self._generator_type = value.lower()
return
@property
def valid_time(self) -> tuple[float, float]:
"""
The range of ages over which the production function is valid. The range is given in My.
Returns
-------
tuple
The lower and upper bounds of the valid time range in My, or None if not applicable.
"""
return (None, None)
@parameter
def quasimc_file(self) -> Path:
"""
File containing the quasi-Monte Carlo craters.
When assigned, the file will be read and processed to set the |production.quasimc_craters| list. The file must be in a format that can be read by the |Crater.from_file| method, and the craters must have production metadata (production_time, production_ND, and/or production_sequence) to be included in the quasimc_craters list. When accessed, it will return the Path to the file containing the quasi-Monte Carlo craters.
"""
return self._quasimc_file
@quasimc_file.setter
def quasimc_file(self, value) -> None:
if value is not None:
if isinstance(value, str):
value = Path(value)
if isinstance(value, Path):
if not value.exists():
raise FileNotFoundError(f"quasimc_file {str(value)} not found")
self._quasimc_file = value
else:
raise TypeError("quasimc_file must be a string or Path")
self.quasimc_craters = self.Crater.from_file(self._quasimc_file)
return
@property
def quasimc_craters(self) -> list[Crater]:
"""
List of craters to be emplaced using quasi-Monte Carlo.
When assigned a list of Crater objects with production metadata (production_time, production_ND, and/or production_sequence), they will be processed to set their |crater.time| value. Any craters without production metadata will be dropped. When accessed, if any of the craters have a None value for time, they will be reprocessed to set their time value. This allows for the quasimc_craters to be dynamically updated.
"""
if len(self._quasimc_craters) > 0:
reprocess = False
for crater in self._quasimc_craters:
if crater.time is None:
reprocess = True
break
if reprocess:
self._quasimc_craters = self._process_quasimc_craters(self._quasimc_craters)
return self._quasimc_craters
@quasimc_craters.setter
def quasimc_craters(self, value: list[Crater] | None):
if value is None or value == []:
self._quasimc_craters = []
elif not isinstance(value, list) or not all(isinstance(c, Crater) for c in value):
raise TypeError("quasimc_craters must be a list of Crater objects")
else:
self._quasimc_craters = self._process_quasimc_craters(value)
@property
def diameter_range(self) -> tuple[float, float]:
"""
The lower and upper range of diameters that will be returned from :py:meth:`~cratermaker.components.production.Production.sample`.
"""
return self._diameter_range
@diameter_range.setter
def diameter_range(self, value: PairOfFloats | None):
# Reset to default
if value is None:
self._diameter_range = (0, np.inf)
elif not isinstance(value, (list, tuple, ArrayLike)) or len(value) != 2:
raise TypeError("diameter_range must be a list, tuple, or array of length 2")
else:
self._diameter_range = (float(value[0]), float(value[1]))
return
@parameter
def D_conversion_factor(self) -> float:
"""
The conversion factor used to convert D in N(D) format into crater diameter in meters. The default value is 1e3, which corresponds to input diameter values in kilometers.
Only two options are allowed, 1 for units of m or 1000 for units of m. If None is provided, it will default to 1000 (km).
"""
if self._D_conversion_factor is None:
self._D_conversion_factor = 1000.0
return self._D_conversion_factor
@D_conversion_factor.setter
def D_conversion_factor(self, value: Literal[1.0, 1000.0] | None):
if value is None:
value = 1000.0
if value in [1.0, 1000.0]:
self._D_conversion_factor = value
else:
raise ValueError("D_conversion factor can only be 1 or 1000 (m or km)")
@property
def D_unit(self) -> str:
if self.D_conversion_factor == 1.0:
return "m"
elif self.D_conversion_factor == 1000.0:
return "km"
else:
return "UNKNOWN UNITS"
@parameter
def N_conversion_factor(self) -> float:
"""
The conversion factor used to convert N in N(D) format into number of craters per m².
Only three options are allowed, 1.0 for units of '# per m²', 1e6 for units of '# per km²', or 1e12 for units of per 10⁶ km². The default value is 1e12."
"""
if self._N_conversion_factor is None:
self._N_conversion_factor = 1.0e12
return self._N_conversion_factor
@N_conversion_factor.setter
def N_conversion_factor(self, value: Literal[1.0, 1e6, 1e12] | None):
if value is None:
value = 1e12
if value in [1.0, 1.0e6, 1.0e12]:
self._N_conversion_factor = value
else:
raise ValueError("N_conversion factor can only be 1.0, 1e6, or 1e12")
@property
def N_unit(self) -> float:
"""
Units of N in the N(D) format convention.
"""
if self._N_conversion_factor == 1.0:
return "per m²"
elif self._N_conversion_factor == 1.0e6:
return "per km²"
elif self._N_conversion_factor == 1.0e12:
return "per 10⁶ km²"
else:
return "UNKNOWN UNIT"
@property
def N_D_units(self) -> float:
"""
Units used in the N(D) format convention.
"""
return f"N>D({self.D_unit}) {self.N_unit}"
@property
def Crater(self) -> type[Crater]:
"""
The Crater class used for crater generation.
This is set by the Simulation object when initializing a crater.
"""
return self._Crater if self._Crater is not None else Crater
import_components(__name__, __path__)