import warnings
import numpy as np
import pandas as pd
from landlab import Component
from landlab.components import FlowDirectorMFD
from landlab.components.mass_wasting_runout.mass_wasting_saver import MassWastingSaver
[docs]
class MassWastingRunout(Component):
"""a cellular-automata mass wasting runout model that routes an initial mass
wasting body (e.g., a landslide) through a watershed, determines erosion and
aggradation depths, evolves the terrain and regolith and tracks attributes of
the regolith. This model is intended for modeling the runout extent, topographic
change and sediment transport caused by a mapped landslide(s) or landslides
inferred from a landslide hazard map.
Examples
----------
Import necessary packages and components
>>> import numpy as np
>>> from landlab import RasterModelGrid
>>> from landlab.components import FlowDirectorMFD
>>> from landlab.components.mass_wasting_runout import MassWastingRunout
Define the topographic__elevation field of a 7 columns by 7 rows, 10-meter
raster model grid
>>> dem = np.array(
... [
... [10, 8, 4, 3, 4, 7.5, 10],
... [10, 9, 3.5, 4, 5, 8, 10],
... [10, 9, 6.5, 5, 6, 8, 10],
... [10, 9.5, 7, 6, 7, 9, 10],
... [10, 10, 9.5, 8, 9, 9.5, 10],
... [10, 10, 10, 10, 10, 10, 10],
... [10, 10, 10, 10, 10, 10, 10],
... ]
... )
>>> dem = np.hstack(dem).astype(float)
>>> mg = RasterModelGrid((7, 7), 10)
>>> _ = mg.add_field("topographic__elevation", dem, at="node")
Define boundary conditions
>>> mg.set_closed_boundaries_at_grid_edges(True, True, True, True)
Add multiflow direction fields, soil thickness (here set to 1 meter)
>>> fd = FlowDirectorMFD(mg, diagonals=True, partition_method="slope")
>>> fd.run_one_step()
>>> nn = mg.number_of_nodes
>>> depth = np.ones(nn) * 1
>>> _ = mg.add_field("soil__thickness", depth, at="node")
Define the initial landslide. Any mass_wasting_id value >1 is considered a
landslide. The landslide extent is defined by assigining all nodes withing
the landslide the same mass_wasting_id value.
Here, the landslide is represented by a single node (node 38), which assigned
a mass_wasting_id value of 1:
>>> mg.at_node["mass__wasting_id"] = np.zeros(nn).astype(int)
>>> mg.at_node["mass__wasting_id"][np.array([38])] = 1
Add attributes of the regolith as fields of the raster model grid that will
be tracked by the model. These could be any attribute in which the tracking
method used by MassWastingRunout reasonably represents movement of the
attribute. Here we track the particle diameter and organic content
of the regolith. Note, a particle__diameter field is required if shear stress
is determined as a function of grain size.
>>> np.random.seed(seed=7)
>>> mg.at_node["particle__diameter"] = np.random.uniform(0.05, 0.25, nn)
>>> mg.at_node["organic__content"] = np.random.uniform(0.01, 0.10, nn)
Next define parameter values for MassWastingRunout and instantiate the model:
>>> Sc = [0.03] # Sc, note: defined as a list (see below)
>>> qsc = 0.01 # qsc
>>> k = 0.02 # k
>>> h_max = 1
>>> tracked_attributes = ["particle__diameter", "organic__content"]
>>> example_square_MWR = MassWastingRunout(
... mg,
... critical_slope=Sc,
... threshold_flux=qsc,
... erosion_coefficient=k,
... tracked_attributes=tracked_attributes,
... effective_qsi=True,
... max_flow_depth_observed_in_field=h_max,
... save=True,
... )
Run MassWastingRunout
>>> example_square_MWR.run_one_step()
By subtracting the initial DEM from the final DEM, which has evolvod as
a consequence of the runout, we can see areas of aggradation (positive values)
and erosion (negative values). Nodes with non-zero topographic change
represent the runout extent.
>>> DEM_initial = mg.at_node["topographic__initial_elevation"]
>>> DEM_final = mg.at_node["topographic__elevation"]
>>> DEM_final - DEM_initial
array([ 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0.96579201,
0.52734339, -0.00778869, 0. , 0. , 0. ,
0. , -0.00594927, -0.12261762, -0.0027898 , 0. ,
0. , 0. , 0. , -0.04562554, -0.10973222,
-0.05776526, 0. , 0. , 0. , 0. ,
-0.01225359, -0.07973101, -0.04888238, 0. , 0. ,
0. , 0. , 0. , -1. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. ])
See how the landslide removes all of the regolith at node 38 (the negative -1)
Look at the final spatial distribution of regolith particle diameter.
>>> mg.at_node["particle__diameter"]
array([0.06526166, 0.20598376, 0.13768185, 0.19469304, 0.2455979 ,
0.15769917, 0.15022409, 0.06441023, 0.1036878 , 0.15619144,
0.17680799, 0.21074781, 0.12618823, 0.06318727, 0.10762912,
0.23191871, 0.09295928, 0.14042479, 0.23545374, 0.05497985,
0.17010978, 0.2400259 , 0.09606058, 0.15969798, 0.23182567,
0.07663389, 0.15468252, 0.20008197, 0.18380265, 0.14355057,
0.09096982, 0.14815318, 0.12447694, 0.14548023, 0.12317808,
0.2175836 , 0.2037295 , 0.11279894, 0. , 0.10520981,
0.14056859, 0.12059567, 0.18147989, 0.12407022, 0.1418186 ,
0.19386482, 0.13259837, 0.23128465, 0.08609032])
Also note that the attribute value is set to zero at any node in which the regolith
depth is 0.
References
----------
Keck, J., Istanbulluoglu, E., Campforts, B., Tucker G., Horner-Devine A.,
A landslide runout model for sediment transport, landscape evolution and hazard
assessment applications, submitted to Earth Surface Dynamics (2023)
"""
_name = "MassWastingRunout"
_unit_agnostic = False
_info = {
"mass__wasting_id": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "interger or float id of each mass wasting area is assigned \
to all nodes representing the mass wasting area.",
},
"topographic__elevation": {
"dtype": float,
"intent": "inout",
"optional": True,
"units": "m",
"mapping": "node",
"doc": "Land surface topographic elevation",
},
"soil__thickness": {
"dtype": float,
"intent": "inout",
"optional": False,
"units": "m",
"mapping": "node",
"doc": "soil depth to restrictive layer",
},
"flow__receiver_node": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array of receivers (node that receives flow from current node)",
},
"flow__receiver_proportions": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array of proportion of flow sent to each receiver.",
},
"topographic__steepest_slope": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "The steepest *downhill* slope",
},
"particle__diameter": {
"dtype": float,
"intent": "inout",
"optional": True,
"units": "m",
"mapping": "node",
"doc": "representative particle diameter at each node, this might \
vary with underlying geology, contributing area or field observations",
},
}
[docs]
def __init__(
self,
grid,
critical_slope=0.05,
threshold_flux=0.25,
erosion_coefficient=0.005,
tracked_attributes=None,
deposition_rule="critical_slope",
grain_shear=True,
effective_qsi=False,
settle_deposit=False,
E_constraint=True,
save=False,
typical_flow_thickness_of_erosion_zone=2,
typical_slope_of_erosion_zone=0.15,
erosion_exponent=0.2,
max_flow_depth_observed_in_field=None,
vol_solids_concentration=0.6,
density_solids=2650,
density_fluid=1000,
gravity=9.81,
dist_to_full_qsc_constraint=0,
itL=1000,
run_id=0,
):
"""
Parameters
----------
grid: landlab raster model grid
critical_slope: list of floats
critical slope (angle of repose if no cohesion) of mass
wasting material , L/L list of length 1 for a basin uniform Sc value
list of length 2 for hydraulic geometry defined Sc, where the first
and second values in the list are the coefficient and exponent of a
user defined function for crictical slope that varies with contributing
area [m2] to a node (e.g., [0.146,0.051] sets Sc<=0.1 at
contributing area > ~1100 m2).
threshold_flux: float
minimum volumetric flux per unit contour width, [L3/L2/iterataion] or
[L/iteration]. Flux below this threshold stops at the cell as a deposit
erosion_coefficient: float
coefficient used to convert total basal shear stress [kPa] of the runout
material to a scour depth [m]
tracked_attributes : list of str or None
A list of the attribute names (strings) that will be tracked by the
runout model. Attributes in tracked_attributes must also be a field
on the model grid and names in list must match the grid field names.
Default is None.
deposition_rule : str
Can be either "critical_slope", "L_metric" or "both".
"critical_slope" is deposition rule used in Keck et al. 2023.
"L_metric" is a variation of rule described by Campforts et al. 2020.
"both" uses the minimum value of both rules.
Default value is "critical_slope".
grain_shear : bool
Indicate whether to define shear stress at the base of the runout material
as a function of grain size using Equation 13 (True) or the depth-slope
approximation using Equation 12 (False). Default is True.
effective_qsi : bool
Indicate whether to limit erosion and aggradation rates to <= the
erosion and aggradation rates coorisponding to the maximum observed flow
depth. All results in Keck et al. 2023 use this constraint. Default is True.
E_constraint : bool
Indicate if erosion can not simultaneously occur with aggradation. If True,
aggradation > 0, then erosion = 0. This is True in Keck et al., 2023.
Default is True.
settle_deposit : bool
Indicate whether to allow deposits to settle before the next model iteration
is implemented. Settlement is determined the critical slope as evaluated from
the lowest adjacent node to the deposit. This is not used in Keck et al. 2023
but tends to allow model to better reproduce smooth, evenly sloped deposits.
Default is False.
save : bool
Save topographic elevation of watershed after each model iteration?
This uses a lot of memory but is helpful for illustrating runout.
The default is False.
Other Parameters
----------
These parameters have a lesser impact on model behavior or may not be
applicable, depending on model run options.
typical_flow_thickness_of_erosion_zone: float
field estimated flow thickness in the erosion-dominatedd reaches of
the runout path [m], used to estimate erosion_coefficient k using
erosion_coef_k function. Default value: 3
typical slope_of_erosion_zone: float
field or remote sensing estimated slope in the scour dominated reach
of the runout path [L/L], used to estimate erosion_coefficient k using
erosion_coef_k function. Default value: 0.4
erosion_exponent: float
The exponent of equation 11, that scales erosion depth as a function of
shear stress. Default value: 0.5
max_flow_depth_observed: float
Maximum observed flow depth, over the entire
runout path [m], h_max in equation 24. Only used effective_qsi is True.
Default value: 4
vol_solids_concentration: float
The ratio of the volume of the solids to the total volume of the flow
mixture. Default value: 0.6
density solids: float
The density of the solids [kg/m3]. Default value: 2650
density fluid: float
The density of the fluid [kg/m3]. Default value: 1000
gravity: float
Acceleration due to gravity [m2/s]. Default value: 9.81
dist_to_full_qsc_constraint : float
distance in meters at which qsc is applied to runout. If the landslide
initiates on relatively flat terrain, it may be difficult to determine
a qsc value that allows the model start and deposit in a way that matches
the observed. In Keck et al. 2023, dist_to_full_qsc_constraint = 0, but
other landslides may need dist_to_full_qsc_constraint = 20 to 50 meters.
itL : int
maximum number of iterations the model runs before it
is forced to stop. The default is 1000. Ideally, if properly parameterized,
the model should stop on its own. All modeled runout in Keck et al. 2023
stopped on its own.
run_id : float, int or str
label for landslide run, can be the time or some other identifier. This
can be updated each time model is implemnted with "run_one_step"
Returns
-------
None
"""
if isinstance(critical_slope, (float, int)):
critical_slope = (critical_slope,)
super().__init__(grid)
if len(critical_slope) > 1:
self.variable_slpc = True
self.a = critical_slope[0]
self.b = critical_slope[1]
else:
self.variable_slpc = False
self.slpc = critical_slope[0]
self.qsc = threshold_flux
self.k = erosion_coefficient
self._tracked_attributes = tracked_attributes
self.deposition_rule = deposition_rule
self.grain_shear = grain_shear
self.effective_qsi = effective_qsi
self.settle_deposit = settle_deposit
self.E_constraint = E_constraint
self.save = save
self.h = typical_flow_thickness_of_erosion_zone
self.s = typical_slope_of_erosion_zone
self.f = erosion_exponent
self.qsi_max = max_flow_depth_observed_in_field
if self.effective_qsi and self.qsi_max is None:
raise ValueError(
"Need to define the 'max_flow_depth_observed_in_field'"
" or set effective_qsi to False"
)
self.vs = vol_solids_concentration
self.ros = density_solids
self.rof = density_fluid
self.g = gravity
self.dist_to_full_qsc_constraint = dist_to_full_qsc_constraint
self.itL = itL
self.run_id = run_id
if tracked_attributes:
self.track_attributes = True
# check attributes are included in grid
for key in self._tracked_attributes:
if not self._grid.has_field(key, at="node"):
raise ValueError(f"{key} not included as field in grid")
# if using grain size dependent erosion, check
# particle_diameter is included as an attribute
if (
self.grain_shear
and "particle__diameter" not in self._tracked_attributes
):
raise ValueError(
"'particle__diameter' not included as field in grid and/or"
" key in tracked_attributes"
)
else:
self.track_attributes = False
# flow routing option
# 'square_root_of_slope', see flow director
self.routing_partition_method = "slope"
# density of runout mixture
self.ro_mw = self.vs * self.ros + (1 - self.vs) * self.rof
# number of model iterations needed to reach dist_to_full_qsc_constraint
self.d_it = int(self.dist_to_full_qsc_constraint / self._grid.dx)
# define initial topographic + mass wasting thickness topography
self._grid.at_node["energy__elevation"] = self._grid.at_node[
"topographic__elevation"
].copy()
self._grid.at_node["topographic__initial_elevation"] = self._grid.at_node[
"topographic__elevation"
].copy()
# prepare data containers for saving model images and behavior statistics
if self.save:
self.saver = MassWastingSaver(self)
self.saver.prep_data_containers()
[docs]
def run_one_step(self):
"""run MWR"""
# get all nodes that define the mass wasting events
mask = self._grid.at_node["mass__wasting_id"] > 0
# separate the mass wasting event nodes into individual events
self.mw_ids = np.unique(self._grid.at_node["mass__wasting_id"][mask])
innL = [] # innL is list of lists of nodes in each mass wasting event
for mw_id in self.mw_ids:
ls_mask = self._grid.at_node["mass__wasting_id"] == mw_id
innL.append(np.hstack(self._grid.nodes)[ls_mask])
# For each mass wasting event in list:
for mw_i, inn in enumerate(innL):
mw_id = self.mw_ids[mw_i]
self._lsvol = (
self._grid.at_node["soil__thickness"][inn].sum()
* self._grid.dx
* self._grid.dy
)
# prepare temporary data containers for each mass wasting event mw_i
if self.save:
self.saver.prep_mw_data_containers(mw_i, mw_id)
# Algorithm 1, prepare initial mass wasting material (debritons) for release
self._prep_initial_mass_wasting_material(inn, mw_i)
# self.arndn_r[mw_id].append(self.arndn)
if self.save:
# save first set of data to reflect scar created by landslide
self.saver.save_conditions_before_runout(mw_i, mw_id)
# Algorith 2, now loop through each receiving nodes,
# determine next set of recieving nodes,
# repeat until no more receiving nodes (material deposits)
self.c = 0 # model iteration counter
while len(self.arn) > 0 and self.c < self.itL:
# set qsc: the qsc constraint does not fully apply until runout
# has traveled dist_to_full_qsc_constraint
if self.d_it == 0:
self.qsc_v = self.qsc
else:
self.qsc_v = self.qsc * (min(self.c / self.d_it, 1))
# temporary data containers for each iteration of the while loop,
# that store receiving node, flux and attributes to become the
# input for the next iteration
self.arndn_ns = np.array([]) # next iteration donor nodes
self.arn_ns = np.array([]) # next iteration receiver nodes
self.arqso_ns = np.array([]) # next iteration flux to receiver nodes
self.arnL = [] # list of receiver nodes
self.arqsoL = [] # list of flux out
self.arndnL = [] # list of donor nodes
if self.track_attributes:
self.aratt_ns = dict.fromkeys(
self._tracked_attributes, np.array([])
) #
self.arattL = dict.fromkeys(self._tracked_attributes, [])
# for each unique node in receiving node list self.arn
self.arn_u = np.unique(self.arn).astype(int) # unique arn list
# determine the incoming flux to each node in self.arn_u
self._determine_qsi()
# update node elevation plus incoming flow thickness
# this happens even if using topographic__elevation to route so that
# the thickness of the debris flow is tracked for plotting
self._update_E_dem()
# determine erosion, aggradation, qso and attributes,
# arranged in array nudat
self._E_A_qso_determine_attributes()
# from qso and flow direction at node, determine flux and attributes
# sent to each receiver node
self._determine_rn_proportions_attributes()
# update grid field: topographic__elevation with the values in
# nudat. Do this after directing flow, because assume deposition
# does not impact flow direction
self._update_dem()
# update topographic slope field
self._update_topographic_slope()
# update tracked attribute grid fields
if self._tracked_attributes:
for key in self._tracked_attributes:
self._update_attribute_at_node(key)
# optional settlment of deposits and redistribution of attributes
if self.settle_deposit:
self._settle()
self._update_topographic_slope()
# once all nodes in this iteration have been processed, the lists of receiving
# nodes (arn), donor nodes (arndn, which are the recieving nodes of this step),
# outgoing node flux (arqso) and node attributes (artt) are updated
# for the next iteration
self.arndn = self.arndn_ns.astype(int)
self.arn = self.arn_ns.astype(int)
self.arqso = self.arqso_ns #
if self.track_attributes:
self.aratt = self.aratt_ns
if self.save:
self.saver.save_conditions_after_one_iteration(mw_i, mw_id)
# update iteration counter
self.c += 1
def _prep_initial_mass_wasting_material(self, inn, mw_i):
"""Algorithm 1 - from an initial source area (landslide), prepare the
initial lists of receiving nodes and incoming fluxes and attributes
and remove the source material from the DEM
Parameters
----------
inn: np.array
node id's that make up the area of the initial mass wasting area
mw_i: int
index of the initial mass wasting area (e.g., if there are two landslides
the first landslide will be mw_i = 0,
the second will be mw_i = 0)
"""
# data containers for initial recieving node, outgoing flux and attributes
rni = np.array([])
rqsoi = np.array([])
if self._tracked_attributes:
att = dict.fromkeys(self._tracked_attributes, np.array([]))
# order source area nodes from lowest to highest elevation
node_z = self._grid.at_node.dataset["topographic__elevation"][inn]
zdf = pd.DataFrame({"nodes": inn, "z": node_z})
zdf = zdf.sort_values("z")
for ci, ni in enumerate(zdf["nodes"].values):
# regolith (soil) thickness at node. soil thickness in source area
# represents landslide thickness
s_t = self._grid.at_node.dataset["soil__thickness"].values[ni]
# remove soil (landslide) thickness at node
self._grid.at_node.dataset["topographic__elevation"][ni] = (
self._grid.at_node.dataset["topographic__elevation"][ni] - s_t
)
# update soil thickness at node (now = 0)
self._grid.at_node["soil__thickness"][ni] = (
self._grid.at_node["soil__thickness"][ni] - s_t
)
if (
ci > 0
): # use surface slope for first node to start movement of landslide
# for all other nodes, update slope to reflect material removed from DEM
self._update_topographic_slope()
# get receiving nodes of node ni in mw index mw_i
rn = self._grid.at_node.dataset["flow__receiver_node"].values[ni]
rn = rn[np.where(rn != -1)]
# receiving proportion of qso from cell n to each downslope cell
rp = self._grid.at_node.dataset["flow__receiver_proportions"].values[ni]
rp = rp[np.where(rp > 0)] # only downslope cells considered
# initial mass wasting thickness
imw_t = s_t
# get flux out of node ni
qso = imw_t
# divide into proportions going to each receiving node
rqso = rp * qso
if self._tracked_attributes:
# get initial mass wasting attributes moving (out) of node ni
self.att_ar_out = {}
for key in self._tracked_attributes:
att_val = self._grid.at_node.dataset[key].values[ni]
# particle diameter to each recieving node
self.att_ar_out[key] = np.ones(len(rqso)) * att_val
# attribute value is zero at node after reglith leaves
self._grid.at_node[key][ni] = 0
att[key] = np.concatenate((att[key], self.att_ar_out[key]), axis=0)
# append receiving node ids, fluxes and attributes to initial lists
rni = np.concatenate((rni, rn), axis=0)
rqsoi = np.concatenate((rqsoi, rqso), axis=0)
self.arndn = np.ones([len(rni)]) * np.nan
self.arn = rni
self.arqso = rqsoi
if self._tracked_attributes:
self.aratt = att
def _E_A_qso_determine_attributes(self):
"""mass conservation at a grid cell, implemented using the EAqA
function below.
"""
self.D_L = [] # list of deposition depths, if the _settle function is
# implemented, this is used to determine settlement after deposition
def EAqA(qsi_r):
"""function for iteratively determining Erosion and Aggradiation depths,
the outgoing flux (qso) and Attribute values at each affected node and
attribute values of the outgoing flux.
thoughts for speeding this up: use cython or try to restructure
EAqA function so that it can be applied with vector operations,
then filter all changes to correct cells where computations
should not have occurred
Parameters
----------
qsi_r: np array
a row of the self.qsi_dat array
"""
n = qsi_r[0]
qsi = qsi_r[1]
# maximum slope at node n
slpn = self._grid.at_node["topographic__steepest_slope"][n].max()
if self.effective_qsi:
qsi_ = min(qsi, self.qsi_max)
else:
qsi_ = qsi
# look up critical slope at node n
if (
self.variable_slpc
): # if option 1, critical slope is not constant but depends on location
self.slpc = self.a * self._grid.at_node["drainage_area"][n] ** self.b
# incoming attributes (weighted average)
if self._tracked_attributes:
att_in = self._attributes_in(n, qsi)
else:
att_in = None
rn_g = self._grid.at_node.dataset["flow__receiver_node"].values[n]
rn_g = rn_g[np.where(rn_g != -1)]
# if qsi less qsc (equation 1)
if qsi <= (self.qsc_v):
A = qsi
qso = 0
E = 0
deta = A
if self._tracked_attributes:
att_up = dict.fromkeys(self._tracked_attributes, 0)
else:
att_up = None
Tau = 0
u = 0
else:
# aggradation
A = min(qsi, self._aggradation(qsi, slpn, n))
if A > 0 and self.E_constraint:
E = 0
if self._tracked_attributes:
att_up = dict.fromkeys(self._tracked_attributes, 0)
else:
att_up = None
Tau = 0
u = 0
else:
# erosion
E, att_up, Tau, u = self._erosion(n, qsi_, slpn, att_in=att_in)
## flux out
qso = qsi - A + E
# small qso are considered zero
qso = np.round(qso, decimals=8)
# chage elevation
deta = A - E
# model behavior tracking
if self.save:
self.saver.save_flow_stats(E, A, qsi, slpn, Tau, u)
# updated attribute values at node
if self._tracked_attributes:
n_att = self._attributes_node(n, att_in, E, A)
else:
n_att = None
# list of deposition depths at cells in iteration
self.D_L.append(A)
# n_att, att_up, att_in are dictionaries of values of each attribute at
# the node after erosion and deposition, in the eroded material before depostion
# and incoming material in qsi respectively
return deta, qso, qsi, E, A, n_att, att_up, att_in
# apply EAqA function to all unique nodes in arn (arn_u)
# create nudat, an np.array of data for updating fields at each node
ll = np.array([EAqA(r) for r in self.qsi_dat], dtype=object)
arn_ur = np.reshape(self.qsi_dat[:, 0], (-1, 1))
self.nudat = np.concatenate((arn_ur, ll), axis=1)
def _determine_rn_proportions_attributes(self):
"""determine how outgoing flux is partitioned to downslope cells and
attributes of each parition"""
def rn_proportions_attributes(nudat_r):
n = nudat_r[0]
qso = nudat_r[2]
qsi = nudat_r[3]
E = nudat_r[4]
A = nudat_r[5]
att_up = nudat_r[7]
att_in = nudat_r[8]
# get donor node ids
dn = self.arndn[self.arn == n]
# get receiver node idds
rn = self._grid.at_node.dataset["flow__receiver_node"].values[n]
rn_ = rn.copy()
# remove node n and delivery nodes from the receiver node list
rn_ = rn_[np.where(rn_ != -1)]
rn_ = rn_[~np.isin(rn_, dn)]
# these constraints needed to avoid infinite loop
if qso > 0 and n not in self._grid.boundary_nodes:
if len(rn_) < 1: # if no receiver nodes, flux stays at node n
rp = np.array([1])
rn = np.array([n])
rqso = rp * qso
else: # flux is proportioned to remaining receiver nodes
rp = self._grid.at_node.dataset[
"flow__receiver_proportions"
].values[n]
rp = rp[np.isin(rn, rn_)]
rp = rp / rp.sum()
rn = rn_
rqso = rp * qso
# create donor node array, all values are donor node,
# length equal to number of receiver nodes
rndn = (np.ones(len(rn)) * n).astype(int)
# outgoing attributes
if self._tracked_attributes:
att_out = self._attribute_out(att_up, att_in, qsi, E, A)
for key in self._tracked_attributes:
ratt = np.ones(len(rqso)) * att_out[key]
self.aratt_ns[key] = np.concatenate(
(self.aratt_ns[key], ratt), axis=0
) # next step receiving node incoming particle diameter list
self.arattL[key].append(ratt)
# store receiving nodes and fluxes in temporary arrays
self.arndn_ns = np.concatenate(
(self.arndn_ns, rndn), axis=0
) # next iteration donor nodes
self.arn_ns = np.concatenate(
(self.arn_ns, rn), axis=0
) # next iteration receiving nodes
self.arqso_ns = np.concatenate(
(self.arqso_ns, rqso), axis=0
) # next iteration qsi
self.arnL.append(rn)
self.arqsoL.append(rqso)
self.arndnL.append(rndn)
[rn_proportions_attributes(r) for r in self.nudat]
def _determine_qsi(self):
"""determine flux of incoming material (qsi) to a node.
returns self.qsi_dat: np array of receiving nodes [column 0],
and qsi to those nodes [column 1]
"""
def _qsi(n):
"""sum the incoming flux to node n"""
qsi = np.sum(self.arqso[self.arn == n])
return qsi
ll = np.array([_qsi(n) for n in self.arn_u], dtype=object)
ll = np.reshape(ll, (-1, 1))
arn_ur = np.reshape(self.arn_u, (-1, 1))
self.qsi_dat = np.concatenate((arn_ur, ll), axis=1)
def _update_E_dem(self):
"""update energy__elevation"""
n = self.qsi_dat[:, 0].astype(int)
qsi = self.qsi_dat[:, 1]
# energy elevation is equal to the topographic elevation plus qsi
self._grid.at_node["energy__elevation"] = self._grid.at_node[
"topographic__elevation"
].copy()
self._grid.at_node["energy__elevation"][n] = (
self._grid.at_node["energy__elevation"].copy()[n] + qsi
)
def _update_energy_slope(self):
"""updates the topographic__slope and flow directions grid fields using the
energy__elevation field. This function is presently not used but may be useful
for future implementations of MWR"""
fd = FlowDirectorMFD(
self._grid,
surface="energy__elevation",
diagonals=True,
partition_method=self.routing_partition_method,
)
fd.run_one_step()
def _update_dem(self):
"""updates the topographic elevation of the landscape dem and soil
thickness fields"""
n = self.nudat[:, 0].astype(int)
deta = self.nudat[:, 1]
self._grid.at_node["soil__thickness"][n] = (
self._grid.at_node["soil__thickness"][n] + deta
)
self._grid.at_node["topographic__elevation"][n] = (
self._grid.at_node["topographic__elevation"][n] + deta
)
def _update_topographic_slope(self):
"""updates the topographic__slope and flow directions fields using the
topographic__elevation field"""
fd = FlowDirectorMFD(
self._grid,
surface="topographic__elevation",
diagonals=True,
partition_method=self.routing_partition_method,
)
fd.run_one_step()
def _update_attribute_at_node(self, key):
"""for each unique node in receiving node list, update the attribute
using attribute value determined in the _E_A_qso_determine_attributes method
Parameters
----------
key: string
one of the tracked attributes
"""
n = self.nudat[:, 0].astype(int)
new_node_pd = np.array([d[key] for d in self.nudat[:, 6]])
if np.isnan(np.sum(new_node_pd)):
raise ValueError(f"{key} is {new_node_pd}")
self._grid.at_node[key][n] = new_node_pd
def _settle(self):
"""for each unique node in receiving node list, after erosion, aggradation
and change in node elevation have been determined, check that the height of the node
is not greater than permitted by angle of repose/critical slope as evaluated from
the lowest cell. Note, slope is not updated in this function. It is updated
simultaneously at a later stage during the iteration.
"""
# for each node in the list, use the slope field, computed from the previous
# iteration, to compute settlment and settlment direction to adjacent cells
for ii, n in enumerate(self.arn_u):
if self.D_L[ii] > 0: # only settle if node has had deposition...use dif?
rn = self._grid.at_node.dataset["flow__receiver_node"].values[n]
# slope to all receiving cells
slpn = self._grid.at_node["topographic__steepest_slope"][n]
# only consider downslope cells
slpn = slpn[np.where(rn != -1)]
rn = rn[np.where(rn != -1)]
# critical slope
if self.variable_slpc:
self.slpc = (
self.a * self._grid.at_node["drainage_area"][n] ** self.b
)
# only consider all cells that slope > Sc
rn = rn[slpn > self.slpc]
slpn = slpn[slpn > self.slpc]
# if slope to downlsope nodes > Sc, adjust elevation of node n
if len(rn) >= 1:
# destribute material to downslope nodes based on weighted
# average slope (same as multiflow direciton proportions,
# but here only determined for downslope nodes in which
# S > Sc )
sslp = sum(slpn)
pp = slpn / sslp
# determine the total flux sent to S > Sc downslope cells
# mean/min downslope cell elevation
zo = self._grid.at_node["topographic__elevation"][rn].min()
# node n elevation
zi = self._grid.at_node["topographic__elevation"][n]
# height of node n using Sc*dx above min downslope elevation
slp_h = self.slpc * self._grid.dx
# out going sediment depth, determined as half the depth
# above the critical slope
qso_s = (zi - (zo + slp_h)) / 2
if qso_s < 0: # no negative (this shouldn't be needed because only
# nodes greater than slpc considered)
qso_s = 0
if qso_s > self.D_L[ii]: # settlement out can not exceed deposit
qso_s = self.D_L[ii]
qso_s_i = qso_s * pp # proportion sent to each receiving cell
# update the topographic elevation
self._grid.at_node["topographic__elevation"][n] = (
self._grid.at_node["topographic__elevation"][n] - qso_s
)
self._grid.at_node["topographic__elevation"][rn] = (
self._grid.at_node["topographic__elevation"][rn] + qso_s_i
)
# update the soil thickness
self._grid.at_node["soil__thickness"][n] = (
self._grid.at_node["soil__thickness"][n] - qso_s
)
self._grid.at_node["soil__thickness"][rn] = (
self._grid.at_node["soil__thickness"][rn] + qso_s_i
)
# update tracked attributes for sediment movement during settlement
if self._tracked_attributes:
# create the att_in dict, this is the same for each of the rn
att_in = {}
for key in self._tracked_attributes:
att_in[key] = self._grid.at_node[key][n]
for v, n_ in enumerate(rn):
A = qso_s_i[v]
n_att_d_ = self._attributes_node(n_, att_in, 0, A)
for key in self._tracked_attributes:
self._grid.at_node[key][n_] = n_att_d_[key]
def _erosion(self, n, depth, slpn, att_in=None):
"""if self.grain_shear is True, determines the erosion depth using
equation (13), otherwise uses equation (12).
Parameters
----------
n : int
node id
depth : float
erosion depth
slpn : float
slope in [l/L]
att_in: dict
dictionary of the value of each attribute, this function
only uses particle__diameter
Returns
-------
E : float
erosion depth [L]
att_up : dict
dictionary of the value of each attribute at node n
Tau : float
basal shear stress [Pa]
u : float
flow velocity [m/s]
"""
theta = np.arctan(slpn) # convert tan(theta) to theta
# attributes of eroded material
if self._tracked_attributes:
att_up = {}
for key in self._tracked_attributes:
att_up[key] = self._grid.at_node[key][n]
else:
att_up = None
if self.grain_shear:
# shear stress approximated as a power function of inertial shear stress
Dp = att_in["particle__diameter"]
if depth < Dp: # grain size dependent erosion breaks if depth<Dp
Dp = depth * 0.99
u = flow_velocity(Dp, depth, slpn, self.g)
Tau = shear_stress_grains(self.vs, self.ros, Dp, depth, slpn, self.g)
Ec = self._grid.dx * erosion_rate(self.k, Tau, self.f, self._grid.dx)
else:
# quasi-static approximation
Tau = self.ro_mw * self.g * depth * (np.sin(theta))
Ec = self.k * (Tau) ** self.f
u = np.nan
dmx = self._grid.at_node["soil__thickness"][n]
E = min(dmx, Ec)
return (E, att_up, Tau, u)
def _aggradation(self, qsi, slpn, n):
"""determine aggradation depth as a function of a threshold-slope,
L*qsi or a function of both. Where the L metric is computed following
Campforts, et al., 2020 but is expressed as 1-(slpn/slpc)**2 rather
than dx/(1-(slpn/slpc)**2)
Parameters
----------
qsi : float
incoming flux [l3/iteration/l2]
slpn : float
slope at node in [L/L]
n : int
node id
Returns
-------
A : float
aggradation depth [L]
"""
if self.deposition_rule == "L_metric":
A = self._deposit_L_metric(qsi, slpn)
elif self.deposition_rule == "critical_slope":
A = self._deposit_friction_angle(qsi, n)
elif self.deposition_rule == "both":
A_L = self._deposit_L_metric(qsi, slpn)
A_f = self._deposit_friction_angle(qsi, n)
A = min(A_L, A_f)
return A
def _determine_zo(self, n, zi, qsi):
"""determine the minimum elevation of the adjacent nodes. If all adjacent
nodes are higher than the elevation of the node + qsi, zo is set to zi
Parameters
----------
n : int
node id
zi : float
topographic elevation at node n (eta_n)
qsi : float
incoming flux [l3/iteration/l2]
Returns
-------
zo : float
topographic elevation of the lowest elevation node [l],
adjacent to node n
"""
# get adjacent nodes
adj_n = np.hstack(
(
self._grid.adjacent_nodes_at_node[n],
self._grid.diagonal_adjacent_nodes_at_node[n],
)
)
# exclude closed boundary nodes
adj_n = adj_n[~np.isin(adj_n, self._grid.closed_boundary_nodes)]
# elevation of flow surface at node... may not need this
ei = qsi + zi
# nodes below elevation of node n
rn_e = adj_n[self._grid.at_node["topographic__elevation"][adj_n] < ei]
if len(rn_e) > 0:
zo = self._grid.at_node["topographic__elevation"][rn_e].min()
else: # an obstruction in the DEM
zo = zi
return zo
def _deposit_L_metric(self, qsi, slpn):
"""
determine the L metric similar to Campforts et al. (2020)
Parameters
----------
qsi : float
in coming flux per unit contour width
slpn : float
slope of node, measured in downslope direction (downslope is postive)
Returns
-------
A_L : float
aggradation depth [L]
"""
Lnum = np.max([(1 - (slpn / self.slpc) ** 2), 0])
A_L = qsi * Lnum
return A_L
def _deposit_friction_angle(self, qsi, n):
"""determine deposition depth following equations 4 though 9
Parameters
----------
qsi : float
incoming flux [l3/iteration/l2]
n : int
node id
Returns
-------
A_f : float
aggradation depth [L]
"""
slp_h = self.slpc * self._grid.dx
zi = self._grid.at_node["topographic__elevation"][n]
zo = self._determine_zo(n, zi, qsi)
rule = (zi - zo) <= (slp_h)
def eq(qsi, zo, zi, slp_h):
dx = self._grid.dx
sc = self.slpc
s = (zi - zo) / dx
sd = sc - s
D1 = sc * dx / 2
a = 0.5 * dx * sd
b = D1 - 0.5 * dx * sd
c = -qsi
N1 = -b + (((b**2) - 4 * a * c) ** 0.5) / (2 * a)
N2 = -b - (((b**2) - 4 * a * c) ** 0.5) / (2 * a)
ndn = np.round(max([N1, N2, 1])) # aggradation on at least one node
A = min((1 / ndn) * qsi + ((ndn - 1) / 2) * dx * sd, qsi)
return A
if rule:
A_f = eq(qsi, zo, zi, slp_h)
else:
A_f = 0
if A_f < 0:
warnings.warn(
f"negative aggradation!! n={n}, qsi={qsi}, A_f={A_f}, zo={zo}, zi={zi}",
stacklevel=2,
)
# raise(ValueError)
return A_f
def _attributes_in(self, n, qsi):
"""determine the weighted average attribute value of the incoming
flow
Parameters
----------
n : int
node id
qsi : float
incoming flux [l3/iteration/l2]
Returns
-------
att_in: dict
dictionary of each attribute value flowing into the node
"""
if qsi == 0:
att_in = dict.fromkeys(self._tracked_attributes, 0)
elif (np.isnan(qsi)) or (np.isinf(qsi)):
msg = "in-flowing flux is nan or inf"
raise ValueError(msg)
else:
att_in = {}
for key in self._tracked_attributes:
att_in[key] = np.sum(
(self.aratt[key][self.arn == n]) * (self.arqso[self.arn == n]) / qsi
)
return att_in
def _attributes_node(self, n, att_in, E, A):
"""determine the weighted average attributes of the newly aggraded material
+ the inplace regolith
Parameters
----------
n : int
node id
att_in: dict
dictionary of the value of each attribute flowing into the node
E : float
erosion depth [L]
A : float
aggradation depth [L]
Returns
-------
n_att_d: dict
dictionary of each attribute value at the node after erosion
and aggradation
"""
def weighted_avg_at_node(key):
"""determine weighted average attribute at the node. If all soil
is eroded, attribute value is zero"""
if A + self._grid.at_node["soil__thickness"][n] - E > 0:
inatt = self._grid.at_node[key][n]
n_att = (
inatt * (self._grid.at_node["soil__thickness"][n] - E)
+ att_in[key] * A
) / (A + self._grid.at_node["soil__thickness"][n] - E)
else:
n_att = 0
if (n_att < 0) or (np.isnan(n_att)) or (np.isinf(n_att)):
msg = "node particle diameter is negative, nan or inf"
raise ValueError(msg)
return n_att
n_att_d = {}
for key in self._tracked_attributes:
n_att_d[key] = weighted_avg_at_node(key)
return n_att_d
def _attribute_out(self, att_up, att_in, qsi, E, A):
"""determine the weighted average attributes of the outgoing
flux
Parameters
----------
att_up: dict
dictionary of each attribute value at the node before erosion
or aggradation
att_in: dict
dictionary of each attribute value flowing into the node
qsi : float
incoming flux [l3/iteration/l2]
E : float
erosion depth [L]
A : float
aggradation depth [L]
Returns
-------
att_out: dict
dictionary of each attribute value flowing out of the node
"""
att_out = {}
for key in self._tracked_attributes:
att_out[key] = np.sum(
(att_up[key] * E + att_in[key] * (qsi - A)) / (qsi - A + E)
)
check_val = att_out[key]
if (check_val <= 0) or (np.isnan(check_val)) or (np.isinf(check_val)):
msg = f"out-flowing {key} is zero, negative, nan or inf"
raise ValueError(msg)
return att_out
[docs]
def flow_velocity(Dp, h, s, g):
us = (g * h * s) ** 0.5
u = us * 5.75 * np.log10(h / Dp)
return u
[docs]
def shear_stress_grains(vs, ros, Dp, h, s, g):
theta = np.arctan(s)
phi = np.arctan(0.32)
u = flow_velocity(Dp, h, s, g)
dudz = u / h
Tcn = np.cos(theta) * vs * ros * (Dp**2) * (dudz**2)
tau = Tcn * np.tan(phi)
return tau
[docs]
def shear_stress_static(vs, ros, rof, h, s, g):
theta = np.arctan(s)
rodf = vs * ros + (1 - vs) * rof
tau = rodf * g * h * (np.sin(theta))
return tau
[docs]
def erosion_coef_k(E_l, tau, f, dx):
k = E_l * dx / (tau**f)
return k
[docs]
def erosion_rate(k, tau, f, dx):
E_l = (k * tau**f) / dx
return E_l