"""Landlab component to calculate drainage density."""
from warnings import warn
import numpy as np
from landlab import Component
[docs]
class DrainageDensity(Component):
    r"""
    Calculate drainage density over a DEM.
    Landlab component that implements the distance to channel algorithm of
    Tucker et al., 2001.
    This component requires EITHER a ``channel__mask array`` with 1's
    where channels exist and 0's elsewhere, OR a set of coefficients
    and exponents for a slope-area relationship and a
    channelization threshold to compare against that relationship.
    If an array is provided it MUST be of type ``np.uint8``. See the example
    below for how to make such an array.
    The ``channel__mask`` array will be assigned to an at-node field with the
    name ``channel__mask``. If the channel__mask was originaly created from a
    passed array, a user can update this array to change the mask.
    If the ``channel__mask`` is created using an area coefficent,
    slope coefficient, area exponent, slope exponent, and channelization
    threshold, the location of the mask will be re-update when
    calculate_drainage_density is called.
    If an area coefficient, :math:`C_A`, a slope coefficent, :math:`C_S`, an
    area exponent, :math:`m_r`, a slope exponent, :math:`n_r`, and
    channelization threshold :math:`T_C` are provided, nodes that meet the
    criteria
    .. math::
       C_A A^{m_r} C_s S^{n_r} > T_c
    where :math:`A` is the drainage density and :math:`S` is the local slope,
    will be marked as channel nodes.
    The ``calculate_drainage_density`` function returns drainage density for the
    model domain. This function calculates the distance from every node to the
    nearest channel node :math:`L` along the flow line of steepest descent
    (assuming D8 routing if the grid is a RasterModelGrid).
    This component stores this distance a field, called:
    ``surface_to_channel__minimum_distance``. The drainage density is then
    calculated (after Tucker et al., 2001):
    .. math::
       D_d = \frac{1}{2\overline{L}}
    where :math:`\overline{L}` is the mean L for the model domain.
    Examples
    --------
    >>> import numpy as np
    >>> from landlab import RasterModelGrid
    >>> from landlab.components import FlowAccumulator, FastscapeEroder
    >>> mg = RasterModelGrid((10, 10))
    >>> _ = mg.add_zeros("node", "topographic__elevation")
    >>> np.random.seed(50)
    >>> noise = np.random.rand(100)
    >>> mg.at_node["topographic__elevation"] += noise
    >>> mg.at_node["topographic__elevation"].reshape(mg.shape)
    array([[0.49460165, 0.2280831 , 0.25547392, 0.39632991, 0.3773151 ,
            0.99657423, 0.4081972 , 0.77189399, 0.76053669, 0.31000935],
           [0.3465412 , 0.35176482, 0.14546686, 0.97266468, 0.90917844,
            0.5599571 , 0.31359075, 0.88820004, 0.67457307, 0.39108745],
           [0.50718412, 0.5241035 , 0.92800093, 0.57137307, 0.66833757,
            0.05225869, 0.3270573 , 0.05640164, 0.17982769, 0.92593317],
           [0.93801522, 0.71409271, 0.73268761, 0.46174768, 0.93132927,
            0.40642024, 0.68320577, 0.64991587, 0.59876518, 0.22203939],
           [0.68235717, 0.8780563 , 0.79671726, 0.43200225, 0.91787822,
            0.78183368, 0.72575028, 0.12485469, 0.91630845, 0.38771099],
           [0.29492955, 0.61673141, 0.46784623, 0.25533891, 0.83899589,
            0.1786192 , 0.22711417, 0.65987645, 0.47911625, 0.07344734],
           [0.13896007, 0.11230718, 0.47778497, 0.54029623, 0.95807105,
            0.58379231, 0.52666409, 0.92226269, 0.91925702, 0.25200886],
           [0.68263261, 0.96427612, 0.22696165, 0.7160172 , 0.79776011,
            0.9367512 , 0.8537225 , 0.42154581, 0.00543987, 0.03486533],
           [0.01390537, 0.58890993, 0.3829931 , 0.11481895, 0.86445401,
            0.82165703, 0.73749168, 0.84034417, 0.4015291 , 0.74862   ],
           [0.55962945, 0.61323757, 0.29810165, 0.60237917, 0.42567684,
            0.53854438, 0.48672986, 0.49989164, 0.91745948, 0.26287702]])
    >>> fr = FlowAccumulator(mg, flow_director="D8")
    >>> fsc = FastscapeEroder(mg, K_sp=0.01, m_sp=0.5, n_sp=1)
    >>> for x in range(100):
    ...     fr.run_one_step()
    ...     fsc.run_one_step(dt=10.0)
    ...     mg.at_node["topographic__elevation"][mg.core_nodes] += 0.01
    ...
    >>> channels = np.array(mg.at_node["drainage_area"] > 5, dtype=np.uint8)
    >>> dd = DrainageDensity(mg, channel__mask=channels)
    >>> mean_drainage_density = dd.calculate_drainage_density()
    >>> np.isclose(mean_drainage_density, 0.3831100571)
    True
    Alternatively you can pass a set of coefficients to identify the channel
    mask. Next shows the same example as above, but with these coefficients
    provided.
    >>> mg = RasterModelGrid((10, 10))
    >>> _ = mg.add_zeros("node", "topographic__elevation")
    >>> np.random.seed(50)
    >>> noise = np.random.rand(100)
    >>> mg.at_node["topographic__elevation"] += noise
    >>> fr = FlowAccumulator(mg, flow_director="D8")
    >>> fsc = FastscapeEroder(mg, K_sp=0.01, m_sp=0.5, n_sp=1)
    >>> for x in range(100):
    ...     fr.run_one_step()
    ...     fsc.run_one_step(dt=10.0)
    ...     mg.at_node["topographic__elevation"][mg.core_nodes] += 0.01
    ...
    >>> channels = np.array(mg.at_node["drainage_area"] > 5, dtype=np.uint8)
    >>> dd = DrainageDensity(
    ...     mg,
    ...     area_coefficient=1.0,
    ...     slope_coefficient=1.0,
    ...     area_exponent=1.0,
    ...     slope_exponent=0.0,
    ...     channelization_threshold=5,
    ... )
    >>> mean_drainage_density = dd.calculate_drainage_density()
    >>> np.isclose(mean_drainage_density, 0.3831100571)
    True
    References
    ----------
    **Required Software Citation(s) Specific to this Component**
    None Listed
    **Additional References**
    Tucker, G., Catani, F., Rinaldo, A., Bras, R. (2001). Statistical analysis
    of drainage density from digital terrain data. Geomorphology 36(3-4),
    187-202. https://dx.doi.org/10.1016/s0169-555x(00)00056-8
    """
    _name = "DrainageDensity"
    _unit_agnostic = True
    _info = {
        "area_coefficient": {
            "dtype": float,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": "Area coefficient to define channels.",
        },
        "area_exponent": {
            "dtype": float,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": "Area exponent to define channels.",
        },
        "channel__mask": {
            "dtype": np.uint8,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": "Logical map of at which grid nodes channels are present",
        },
        "channelization_threshold": {
            "dtype": float,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": (
                "Channelization threshold for use with area and slope "
                "coefficients and exponents."
            ),
        },
        "flow__link_to_receiver_node": {
            "dtype": int,
            "intent": "in",
            "optional": False,
            "units": "-",
            "mapping": "node",
            "doc": "ID of link downstream of each node, which carries the discharge",
        },
        "flow__receiver_node": {
            "dtype": int,
            "intent": "in",
            "optional": False,
            "units": "-",
            "mapping": "node",
            "doc": "Node array of receivers (node that receives flow from current node)",
        },
        "flow__upstream_node_order": {
            "dtype": int,
            "intent": "in",
            "optional": False,
            "units": "-",
            "mapping": "node",
            "doc": "Node array containing downstream-to-upstream ordered list of node IDs",
        },
        "slope_coefficient": {
            "dtype": float,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": "Slope coefficient to define channels.",
        },
        "slope_exponent": {
            "dtype": float,
            "intent": "in",
            "optional": True,
            "units": "-",
            "mapping": "node",
            "doc": "Slope exponent to define channels.",
        },
        "surface_to_channel__minimum_distance": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m",
            "mapping": "node",
            "doc": "Distance from each node to the nearest channel",
        },
        "topographic__steepest_slope": {
            "dtype": float,
            "intent": "in",
            "optional": False,
            "units": "-",
            "mapping": "node",
            "doc": "The steepest *downhill* slope",
        },
    }
[docs]
    def __init__(
        self,
        grid,
        channel__mask=None,
        area_coefficient=None,
        slope_coefficient=None,
        area_exponent=None,
        slope_exponent=None,
        channelization_threshold=None,
    ):
        """Initialize the DrainageDensity component.
        Parameters
        ----------
        grid : ModelGrid
        channel__mask : Array that holds 1's where
            channels exist and 0's elsewhere
        area_coefficient : coefficient to multiply drainage area by,
            for calculating channelization threshold
        slope_coefficient : coefficient to multiply slope by,
            for calculating channelization threshold
        area_exponent : exponent to raise drainage area to,
            for calculating channelization threshold
        slope_exponent : exponent to raise slope to,
            for calculating channelization threshold
        channelization_threshold : threshold value above
            which channels exist
        """
        super().__init__(grid)
        if grid.at_node["flow__receiver_node"].size != grid.size("node"):
            raise NotImplementedError(
                "A route-to-multiple flow director has been "
                "run on this grid. The landlab development team has not "
                "verified that DrainageDensity is compatible with "
                "route-to-multiple methods. Please open a GitHub Issue "
                "to start this process."
            )
        if channel__mask is not None:
            if area_coefficient is not None:
                warn(
                    "Channel mask and area "
                    "coefficient supplied. Defaulting "
                    "to channel mask, ignoring area "
                    "coefficient."
                )
            if slope_coefficient is not None:
                warn(
                    "Channel mask and slope "
                    "coefficient supplied. Defaulting "
                    "to channel mask, ignoring slope "
                    "coefficient."
                )
            if area_exponent is not None:
                warn(
                    "Channel mask and area "
                    "exponent supplied. Defaulting "
                    "to channel mask, ignoring area "
                    "exponent."
                )
            if slope_exponent is not None:
                warn(
                    "Channel mask and slope "
                    "exponent supplied. Defaulting "
                    "to channel mask, ignoring slope "
                    "exponent."
                )
            if channelization_threshold is not None:
                warn(
                    "Channel mask and channelization "
                    "threshold supplied. Defaulting "
                    "to channel mask, ignoring "
                    "threshold."
                )
            if grid.number_of_nodes != len(channel__mask):
                raise ValueError(
                    "Length of channel mask is not equal to " "number of grid nodes"
                )
            if "channel__mask" in grid.at_node:
                warn("Existing channel__mask grid field was overwritten.")
            if channel__mask.dtype.type is not np.uint8:
                raise ValueError("mask must by np.uint8")
            self._mask_as_array = True
            self._update_channel_mask = self._update_channel_mask_array
            grid.at_node["channel__mask"] = channel__mask
        if channel__mask is None:
            if area_coefficient is None:
                raise ValueError(
                    "No channel mask and no area "
                    "coefficient supplied. Either "
                    "a channel mask or all 5 threshold "
                    "parameters are needed."
                )
            if slope_coefficient is None:
                raise ValueError(
                    "No channel mask and no slope "
                    "coefficient supplied. Either "
                    "a channel mask or all 5 threshold "
                    "parameters are needed."
                )
            if area_exponent is None:
                raise ValueError(
                    "No channel mask and no area "
                    "exponent supplied. Either "
                    "a channel mask or all 5 threshold "
                    "parameters are needed."
                )
            if slope_exponent is None:
                raise ValueError(
                    "No channel mask and no slope "
                    "exponent supplied. Either "
                    "a channel mask or all 5 threshold "
                    "parameters are needed."
                )
            if channelization_threshold is None:
                raise ValueError(
                    "No channel mask and no channelization "
                    "threshold supplied. Either "
                    "a channel mask or all 5 threshold "
                    "parameters are needed."
                )
            self._mask_as_array = False
            self._update_channel_mask = self._update_channel_mask_values
            self._area_coefficient = area_coefficient
            self._slope_coefficient = slope_coefficient
            self._area_exponent = area_exponent
            self._slope_exponent = slope_exponent
            self._channelization_threshold = channelization_threshold
            self._update_channel_mask()
        # for this component to work with Cython acceleration,
        # the channel_network must be uint8, not bool...
        self._channel_network = grid.at_node["channel__mask"]
        # Flow receivers
        self._flow_receivers = grid.at_node["flow__receiver_node"]
        # Links to receiver nodes
        self._stack_links = grid.at_node["flow__link_to_receiver_node"]
        # Upstream node order
        self._upstream_order = grid.at_node["flow__upstream_node_order"]
        # Distance to channel
        if "surface_to_channel__minimum_distance" not in grid.at_node:
            grid.add_zeros(
                "surface_to_channel__minimum_distance", at="node", dtype=float
            )
        self._distance_to_channel = grid.at_node["surface_to_channel__minimum_distance"]
        # Use the appropriate array for link or d8 lengths
        try:
            self._length_of_link = self._grid.length_of_d8
        except AttributeError:
            self._length_of_link = self._grid.length_of_link
 
    def _update_channel_mask_array(self):
        raise NotImplementedError(
            "If you provided a channel mask to "
            "DrainageDensity, update it by updating the "
            "model grid field channel__mask"
        )
    def _update_channel_mask_values(self):
        channel__mask = (
            self._area_coefficient
            * np.power(self._grid.at_node["drainage_area"], self._area_exponent)
            * self._slope_coefficient
            * np.power(
                self._grid.at_node["topographic__steepest_slope"], self._slope_exponent
            )
        ) > self._channelization_threshold
        self._grid.at_node["channel__mask"] = channel__mask.astype(np.uint8)
[docs]
    def calculate_drainage_density(self):
        """Calculate drainage density.
        If the channel mask is defined based on slope and area coefficients,
        it will be update based on the current drainage area and slope fields.
        Returns
        -------
        landscape_drainage_density : float (1/m)
            Drainage density over the model domain.
        """
        from .cfuncs import _calc_dists_to_channel
        if self._mask_as_array is False:
            self._update_channel_mask()
        _calc_dists_to_channel(
            self._channel_network,
            self._flow_receivers,
            self._upstream_order,
            self._length_of_link,
            self._stack_links,
            self._distance_to_channel,
            self._grid.number_of_nodes,
        )
        landscape_drainage_density = 1.0 / (
            2.0
            * np.mean(
                self._grid.at_node["surface_to_channel__minimum_distance"][
                    self._grid.core_nodes
                ]
            )
        )
        # this is THE drainage density
        return landscape_drainage_density