import numpy as np
from landlab import Component
from landlab import MissingKeyError
from landlab.utils.return_array import return_array_at_node
from ..depression_finder.lake_mapper import _FLOODED
from .cfuncs import brent_method_erode_fixed_threshold
from .cfuncs import brent_method_erode_variable_threshold
[docs]
class StreamPowerEroder(Component):
"""Erode where channels are.
Implemented as:
.. math::
E = K A^m S^n - sp_{crit},
and if :math:`E < 0`, :math:`E = 0`.
If ``channel_width_field`` is declared and ``True``, the module instead implements:
.. math::
E = K A^m S^n / W - sp_{crit}
Note that although the Braun-Willett (2013) scheme that underlies this
component is nominally implicit, and will reach a numerically-correct
solution under topographic steady state regardless of timestep length, the
accuracy of transient solutions is *not* timestep independent (see
Braun & Willett 2013, Appendix B for further details).
Although the scheme remains significantly more robust and permits longer
timesteps than a traditional explicit solver under such conditions, it
is still possible to create numerical instability through use of too long
a timestep while using this component. The user is cautioned to check their
implementation is behaving stably before fully trusting it.
Examples
--------
>>> import numpy as np
>>> from landlab import RasterModelGrid
>>> from landlab.components import FlowAccumulator, StreamPowerEroder
>>> mg = RasterModelGrid((5, 5), xy_spacing=10.0)
>>> z = np.array(
... [
... [7.0, 7.0, 7.0, 7.0, 7.0],
... [7.0, 5.0, 3.2, 6.0, 7.0],
... [7.0, 2.0, 3.0, 5.0, 7.0],
... [7.0, 1.0, 1.9, 4.0, 7.0],
... [7.0, 0.0, 7.0, 7.0, 7.0],
... ]
... )
>>> z = mg.add_field("topographic__elevation", z, at="node")
>>> fr = FlowAccumulator(mg, flow_director="D8")
>>> sp = StreamPowerEroder(mg, K_sp=1.0)
>>> fr.run_one_step()
>>> sp.run_one_step(dt=1.0)
>>> z
array([7. , 7. , 7. , 7. , 7. ,
7. , 2.92996598, 2.02996598, 4.01498299, 7. ,
7. , 0.85993197, 1.87743897, 3.28268321, 7. ,
7. , 0.28989795, 0.85403051, 2.42701526, 7. ,
7. , 0. , 7. , 7. , 7. ])
>>> mg2 = RasterModelGrid((3, 7))
>>> z = np.array(mg2.node_x**2.0)
>>> z = mg2.add_field("topographic__elevation", z, at="node")
>>> mg2.status_at_node[mg2.nodes_at_left_edge] = mg2.BC_NODE_IS_FIXED_VALUE
>>> mg2.status_at_node[mg2.nodes_at_top_edge] = mg2.BC_NODE_IS_CLOSED
>>> mg2.status_at_node[mg2.nodes_at_bottom_edge] = mg2.BC_NODE_IS_CLOSED
>>> mg2.status_at_node[mg2.nodes_at_right_edge] = mg2.BC_NODE_IS_CLOSED
>>> fr2 = FlowAccumulator(mg2, flow_director="D8")
>>> sp2 = StreamPowerEroder(mg2, K_sp=0.1, m_sp=0.0, n_sp=2.0, threshold_sp=2.0)
>>> fr2.run_one_step()
>>> sp2.run_one_step(dt=10.0)
>>> z.reshape((3, 7))[1, :]
array([ 0. , 1. , 4. , 8.52493772, 13.29039699,
18.44367949, 36. ])
>>> mg3 = RasterModelGrid((5, 5), xy_spacing=2.0)
>>> z = mg.node_x / 100.0
>>> z = mg3.add_field("topographic__elevation", z, at="node")
>>> mg3.status_at_node[mg3.nodes_at_left_edge] = mg3.BC_NODE_IS_FIXED_VALUE
>>> mg3.status_at_node[mg3.nodes_at_top_edge] = mg3.BC_NODE_IS_CLOSED
>>> mg3.status_at_node[mg3.nodes_at_bottom_edge] = mg3.BC_NODE_IS_CLOSED
>>> mg3.status_at_node[mg3.nodes_at_right_edge] = mg3.BC_NODE_IS_CLOSED
>>> mg3.at_node["water__unit_flux_in"] = mg3.node_y
>>> fr3 = FlowAccumulator(mg3, flow_director="D8")
>>> sp3 = StreamPowerEroder(
... mg3,
... K_sp=1.0,
... sp_type="Unit",
... a_sp=1.0,
... b_sp=0.5,
... c_sp=1.0,
... discharge_field="surface_water__discharge",
... )
>>> fr3.run_one_step()
>>> sp3.run_one_step(1.0)
>>> z
array([0. , 0.1 , 0.2 , 0.3 , 0.4 ,
0. , 0.02898979, 0.0859932 , 0.17463772, 0.4 ,
0. , 0.02240092, 0.06879049, 0.14586033, 0.4 ,
0. , 0.01907436, 0.05960337, 0.12929386, 0.4 ,
0. , 0.1 , 0.2 , 0.3 , 0.4 ])
References
----------
**Required Software Citation(s) Specific to this Component**
None Listed
**Additional References**
Braun, J., Willett, S. (2013). A very efficient O(n), implicit and parallel
method to solve the stream power equation governing fluvial incision and
landscape evolution. Geomorphology 180-181(C), 170-179.
https://dx.doi.org/10.1016/j.geomorph.2012.10.008
"""
_name = "StreamPowerEroder"
_unit_agnostic = True
_info = {
"drainage_area": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "m**2",
"mapping": "node",
"doc": "Upstream accumulated surface area contributing to the node's discharge",
},
"flow__link_to_receiver_node": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "ID of link downstream of each node, which carries the discharge",
},
"flow__receiver_node": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array of receivers (node that receives flow from current node)",
},
"flow__upstream_node_order": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array containing downstream-to-upstream ordered list of node IDs",
},
"topographic__elevation": {
"dtype": float,
"intent": "inout",
"optional": False,
"units": "m",
"mapping": "node",
"doc": "Land surface topographic elevation",
},
}
[docs]
def __init__(
self,
grid,
K_sp=0.001,
threshold_sp=0.0,
sp_type="set_mn",
m_sp=0.5,
n_sp=1.0,
a_sp=None,
b_sp=None,
c_sp=None,
channel_width_field=1.0,
discharge_field="drainage_area",
erode_flooded_nodes=True,
):
"""Initialize the StreamPowerEroder.
Parameters
----------
grid : ModelGrid
A grid.
K_sp : float, array, or field name
K in the stream power equation (units vary with other parameters).
threshold_sp : positive float, array, or field name, optional
The threshold stream power, below which no erosion occurs. This
threshold is assumed to be in "stream power" units, i.e., if
sp_type is 'Shear_stress', the value should be tau**a.
sp_type : {'set_mn', 'Total', 'Unit', 'Shear_stress'}
Controls how the law is implemented. If 'set_mn', use the supplied
values of m_sp and n_sp. Else, component will derive values of m and n
from supplied values of a_sp, b_sp, and c_sp, following Whipple and
Tucker:
* If ``'Total'``, ``m = a * c``, ``n = a``.
* If ``'Unit'``, ``m = a * c *(1 - b)``, ``n = a``.
* If ``'Shear_stress'``, ``m = 2 * a * c * (1 - b) / 3``,
``n = 2 * a / 3``.
m_sp : float, optional
m in the stream power equation (power on drainage area). Overridden if
a_sp, b_sp, and c_sp are supplied.
n_sp : float, optional, ~ 0.5<n_sp<4.
n in the stream power equation (power on slope). Overridden if
a_sp, b_sp, and c_sp are supplied.
a_sp : float, optional
The power on the SP/shear term to get the erosion rate; the "erosional
process" term. Only used if sp_type is not 'set_mn'.
b_sp : float, optional
The power on discharge to get width; the "hydraulic geometry" term.
Only used if sp_type in ('Unit', 'Shear_stress').
c_sp : float, optional
The power on area to get discharge; the "basin hydology" term. Only
used if sp_type is not 'set_mn'.
channel_width_field : None, float, array, or field name, optional
If not None, component will look for node-centered data describing
channel width or if an array, will take the array as the channel
widths. It will use the widths to implement incision ~ stream power
per unit width. If sp_type is 'set_mn', follows the equation given
above. If sp_type in ('Unit', 'Shear_stress'), the width value will
be implemented directly. W has no effect if sp_type is 'Total'.
discharge_field : float, field name, or array, optional
Discharge [L^2/T]. The default is to use the grid field
'drainage_area'. To use custom spatially/temporally varying
rainfall, use 'water__unit_flux_in' to specify water input to the
FlowAccumulator and use "surface_water__discharge" for this
keyword argument.
erode_flooded_nodes : bool (optional)
Whether erosion occurs in flooded nodes identified by a
depression/lake mapper (e.g., DepressionFinderAndRouter). When set
to false, the field *flood_status_code* must be present on the grid
(this is created by the DepressionFinderAndRouter). Default True.
"""
super().__init__(grid)
if "flow__receiver_node" in grid.at_node and grid.at_node[
"flow__receiver_node"
].size != grid.size("node"):
raise NotImplementedError(
"A route-to-multiple flow director has been "
"run on this grid. The landlab development team has not "
"verified that StreamPowerEroder is compatible with "
"route-to-multiple methods. Please open a GitHub Issue "
"to start this process."
)
if not erode_flooded_nodes and "flood_status_code" not in self._grid.at_node:
raise ValueError(
"In order to not erode flooded nodes another component "
"must create the field *flood_status_code*. You want to "
"run a lake mapper/depression finder."
)
self._erode_flooded_nodes = erode_flooded_nodes
self._A = return_array_at_node(grid, discharge_field)
self._elevs = return_array_at_node(grid, "topographic__elevation")
self._sp_crit = return_array_at_node(grid, threshold_sp)
# use setter for K defined below
self.K = K_sp
assert np.all(self._sp_crit >= 0.0)
if discharge_field == "drainage_area":
self._use_Q = False
else:
self._use_Q = True
if channel_width_field is None:
self._use_W = False
else:
self._use_W = True
self._W = return_array_at_node(grid, channel_width_field)
if np.any(threshold_sp != 0.0):
self._set_threshold = True
# ^flag for sed_flux_dep_incision to see if the threshold was
# manually set.
else:
self._set_threshold = False
self._type = sp_type
if sp_type == "set_mn":
assert (float(m_sp) >= 0.0) and (
float(n_sp) >= 0.0
), "m and n must be positive"
self._m = float(m_sp)
self._n = float(n_sp)
assert (
(a_sp is None) and (b_sp is None) and (c_sp is None)
), "If sp_type is 'set_mn', do not pass values for a, b, or c!"
else:
assert sp_type in ("Total", "Unit", "Shear_stress"), (
"sp_type not recognised. It must be 'set_mn', 'Total', "
+ "'Unit', or 'Shear_stress'."
)
assert (
m_sp == 0.5 and n_sp == 1.0
), "Do not set m and n if sp_type is not 'set_mn'!"
assert float(a_sp) >= 0.0, "a must be positive"
self._a = float(a_sp)
if b_sp is not None:
assert float(b_sp) >= 0.0, "b must be positive"
self._b = float(b_sp)
else:
assert self._use_W, "b was not set"
self._b = 0.0
if c_sp is not None:
assert float(c_sp) >= 0.0, "c must be positive"
self._c = float(c_sp)
else:
assert self._use_Q, "c was not set"
self._c = 1.0
if self._type == "Total":
self._n = self._a
self._m = self._a * self._c # ==_a if use_Q
elif self._type == "Unit":
self._n = self._a
self._m = self._a * self._c * (1.0 - self._b)
# ^ ==_a iff use_Q&use_W etc
elif self._type == "Shear_stress":
self._m = 2.0 * self._a * self._c * (1.0 - self._b) / 3.0
self._n = 2.0 * self._a / 3.0
else:
raise MissingKeyError(
"Not enough information was provided on the exponents to use!"
)
# m and n will always be set, but care needs to be taken to include Q
# and W directly if appropriate
self._stream_power_erosion = self._grid.zeros(at="node")
self._alpha = self._grid.zeros(at="node")
@property
def K(self):
"""Erodibility (units depend on m_sp)."""
return self._K
@K.setter
def K(self, new_val):
self._K = return_array_at_node(self._grid, new_val)
[docs]
def run_one_step(self, dt):
"""A simple, explicit implementation of a stream power algorithm.
If you are routing across flooded depressions in your flow routing
scheme, be sure to set *erode_flooded_nodes* flag in the instantiation
of the component to ensure erosion cannot occur in the lake. Erosion
is always zero if the gradient is adverse, but can still procede as
usual on the entry into the depression unless this flag is set.
Parameters
----------
dt : float
Time-step size
"""
if not self._erode_flooded_nodes:
flood_status = self._grid.at_node["flood_status_code"]
flooded_nodes = np.nonzero(flood_status == _FLOODED)[0]
else:
flooded_nodes = []
upstream_order_IDs = self._grid["node"]["flow__upstream_node_order"]
defined_flow_receivers = np.not_equal(
self._grid["node"]["flow__link_to_receiver_node"], self._grid.BAD_INDEX
)
try:
length_of_link = self._grid.length_of_d8
except AttributeError:
length_of_link = self._grid.length_of_link
flow_link_lengths = length_of_link[
self._grid.at_node["flow__link_to_receiver_node"][defined_flow_receivers]
]
flow_receivers = self._grid["node"]["flow__receiver_node"]
# Operate the main function:
if self._use_W:
self._alpha[defined_flow_receivers] = (
self._K[defined_flow_receivers]
* dt
* self._A[defined_flow_receivers] ** self._m
/ self._W[defined_flow_receivers]
/ (flow_link_lengths**self._n)
)
else:
self._alpha[defined_flow_receivers] = (
self._K[defined_flow_receivers]
* dt
* self._A[defined_flow_receivers] ** self._m
/ (flow_link_lengths**self._n)
)
# Handle flooded nodes, if any (no erosion there)
if flooded_nodes is not None:
self._alpha[flooded_nodes] = 0.0
reversed_flow = self._elevs < self._elevs[flow_receivers]
# this check necessary if flow has been routed across
# depressions
self._alpha[reversed_flow] = 0.0
threshdt = self._sp_crit * dt
# solve using Brent's Method in Cython for Speed
if isinstance(threshdt, float):
brent_method_erode_fixed_threshold(
upstream_order_IDs,
flow_receivers,
threshdt,
self._alpha,
self._n,
self._elevs,
)
else:
brent_method_erode_variable_threshold(
upstream_order_IDs,
flow_receivers,
threshdt,
self._alpha,
self._n,
self._elevs,
)