from collections.abc import Sequence
from typing import Literal
import numpy as np
import pandas as pd
from numpy.typing import ArrayLike
from numpy.typing import NDArray
from landlab.components.flow_director.flow_director_steepest import FlowDirectorSteepest
"""
A collection of tools for mapping values (e.g., flow, shear stress) between
network model grid and raster model grid representations of a channel network.
"""
[docs]
def get_link_nodes(nmgrid):
"""Get the downstream (head) and upstream (tail) nodes at a link from
flow director. The network model grid nodes_at_link attribute may not be
ordered according to flow direction. Output from this function should be
used for all channel_network_grid_tools functions that require a link_nodes
input
Parameters
----------
nmgrid : network model grid
Returns
-------
link_nodes : np.array
for a nmgrid of n nodes, returns a nx2 np array, the ith row of the
array is the [downstream node id, upstream node id] of the ith link
"""
fd = FlowDirectorSteepest(nmgrid, "topographic__elevation")
fd.run_one_step()
return np.column_stack(
(fd.downstream_node_at_link(), fd.upstream_node_at_link())
).astype(int, copy=False)
def _link_to_points_and_dist(
point_0: tuple[float, float],
point_1: tuple[float, float],
number_of_points: int = 1000,
):
"""Given two points defined by coordinates x0,y0 and x1,y1, define a series
of points between them and the distance from point x0,y0 to each point.
Parameters
----------
point_0 : tuple of 2 floats
point 0 coordinates x and y
point_1 : tuple of 2 floats
point 1 coordinates x and y
number_of_points : int
number of points to create along the reach. The default is 1000.
Returns
-------
X : np array
x coordinate of points
Y : np array
y coordinate of points
dist : np array
linear distance between points
"""
x0 = point_0[0]
y0 = point_0[1]
x1 = point_1[0]
y1 = point_1[1]
X = np.linspace(x0, x1, number_of_points)
Y = np.linspace(y0, y1, number_of_points)
dist = np.hypot(X - x0, Y - y0)
return X, Y, dist
def _dist_func(x0, x1, y0, y1):
return np.hypot(x0 - x1, y0 - y1)
[docs]
def min_distance_to_network(grid, acn, node_id):
"""Determine the shortest distance (as the crow flies) from a node to the
channel network and the closest channel node
Parameters
----------
grid : raster model grid
acn : list of int
array of all node ids included in the channel network
node_id : int
ID of node from which the distance will be determined
Returns
-------
offset : float
distance between node and channel network
mdn : int
ID of channel node that is closest node
"""
x0, y0 = grid.node_x[node_id], grid.node_y[node_id]
x_acn, y_acn = grid.node_x[acn], grid.node_y[acn]
dist = np.hypot(x_acn - x0, y_acn - y0)
idx = np.argmin(dist)
offset = dist[idx]
mdn = acn[idx]
return float(offset), int(mdn)
[docs]
def choose_from_repeated(
sorted_array: ArrayLike,
choose: Literal["first", "last"] = "last",
) -> NDArray[np.bool_]:
"""Mark the first/last element of repeated values in a **sorted** 1-D array.
Parameters
----------
sorted_array : array_like
Assumed sorted by the grouping key.
choose : {'first','last'}, optional
Whether to mark the first or last item of each run.
Examples
--------
>>> array = [0, 0, 0, 2, 2, 5, 6, 6, 6, 6, 6]
>>> is_last = choose_from_repeated(array, choose="last")
>>> is_last.astype(int)
array([0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1])
"""
a = np.asarray(sorted_array).ravel()
same_as_previous = np.zeros(a.size, dtype=bool)
if a.size <= 1:
return np.ones(a.size, dtype=bool)
same_as_previous[1:] = a[1:] == a[:-1]
if choose == "last":
keep_mask = np.ones_like(same_as_previous)
keep_mask[:-1] = ~same_as_previous[1:]
elif choose == "first":
keep_mask = ~same_as_previous
else:
raise ValueError(f"choose must be 'first' or 'last', got {choose!r}")
return keep_mask
[docs]
def choose_unique(
values: ArrayLike,
order_by: Sequence[ArrayLike] | None = None,
choose: Literal["first", "last"] = "last",
) -> NDArray[np.intp]:
"""Find indices of unique values, selecting one representative if repeated.
Examples
--------
>>> choose_unique([0, 1, 0, 0, 1], order_by=([10.0, 11.0, 12.0, 13.0, 14],))
array([3, 4])
>>> choose_unique([1, 0, 0, 1, 0], order_by=([10.0, 11.0, 12.0, 13.0, 14],))
array([3, 4])
"""
values = np.asarray(values).ravel()
order_by = (
() if order_by is None else tuple(np.asarray(key).ravel() for key in order_by)
)
if any(key.size != values.size for key in order_by):
raise ValueError("All `order_by` arrays must match `values` length")
sorted_rows = np.lexsort(order_by + (values,))
is_last = choose_from_repeated(values[sorted_rows], choose=choose)
return np.sort(sorted_rows[is_last])
[docs]
def map_nmg_links_to_rmg_coincident_nodes(
grid, nmgrid, link_nodes, remove_duplicates=False
):
"""Map links of a network model grid to all coincident raster model grid
nodes. Each coincident raster model grid node is defined in terms of its
x and y coordinates, the link it is mapped to and distance downstream from
the upstream (tail) end of the link.
Parameters
----------
grid : raster model grid
nmgrid : network model grid
link_nodes : np array
head and tail node of each link
remove_duplicates : bool
if True, when two or more links are coincident with the same node,
the node is assigned to the link with the largest drainage area. If False,
the node is assigned to each coincident link. The default is False.
Returns
-------
nmg_link_to_rmg_coincident_nodes_mapper: pandas dataframe
each row of the dataframe lists the link ID, the coincident node ID, the
x and y coordinates and the downstream distance of the coincident node
and the drainage area of the link
"""
Lxy = [] # list of all nodes and node attributes that coincide with the
# network model grid links
# loop through all links in network model grid to determine raster grid cells
# coincident with each link and equivalent distance from upstream (tail) node
for linkID, lknd in enumerate(link_nodes): # for each link in network grid
x0 = nmgrid.x_of_node[lknd[0]] # x and y of downstream link node
y0 = nmgrid.y_of_node[lknd[0]]
x1 = nmgrid.x_of_node[lknd[1]] # x and y of upstream link node
y1 = nmgrid.y_of_node[lknd[1]]
# x and y coordinates and downstream distance from the upstream (tail)
# node for 1000 points generated from downstream node to upstream node
X, Y, dist = _link_to_points_and_dist((x0, y0), (x1, y1), number_of_points=1000)
dist = dist.max() - dist # convert to distance from tail node
nodelist = [] # list of nodes along link
for i, y in enumerate(Y):
x = X[i]
node = grid.find_nearest_node((x, y))
# if node not already in list, append - many points will be in same cell;
# only need to list cell once
if node not in nodelist:
nodelist.append(node)
xy = {
"linkID": linkID,
"coincident_node": node,
"x": grid.node_x[node],
"y": grid.node_y[node],
"dist": dist[i],
"drainage_area": nmgrid.at_link["drainage_area"][linkID],
}
Lxy.append(xy)
df = pd.DataFrame(Lxy)
# if remove_duplicates, remove duplicate node id from link with smaller
# contributing area.
if remove_duplicates:
values = df["coincident_node"].to_numpy()
area = df["drainage_area"].to_numpy()
idx = choose_unique(values=values, order_by=[area], choose="last")
idx.sort()
df = df.iloc[idx].reset_index(drop=True)
return df