#!/usr/env/python
"""Landlab component that simulates landslide probability of failure as well as
mean relative wetness and probability of saturation.
Relative wetness and factor-of-safety are based on the infinite slope
stability model driven by topographic and soils inputs and recharge provided
by user as inputs to the component. For each node, component simulates mean
relative wetness as well as the probability of saturation based on Monte Carlo
simulation of relative wetness where the probability is the number of
iterations with relative wetness >= 1.0 divided by the number of iterations.
Probability of failure for each node is also simulated in the Monte Carlo
simulation as the number of iterations with factor-of-safety <= 1.0
divided by the number of iterations.
.. codeauthor:: R.Strauch, E.Istanbulluoglu, & S.S.Nudurupati
University of Washington
Ref 1: Strauch et. al. 2017, 'A hydro-climatological approach to predicting
regional landslide probability using Landlab, Earth Surface Dynamics, In prep.
Ref 2: 'The Landlab LandslideProbability Component User Manual' @
https://github.com/RondaStrauch/pub_strauch_etal_esurf/blob/master/LandslideComponentUsersManual.pdf
Created on Thu Aug 20, 2015
Last edit June 7, 2017
"""
import copy
import numpy as np
import scipy.constants
from scipy import interpolate
from statsmodels.distributions.empirical_distribution import ECDF
from landlab import Component
[docs]
class LandslideProbability(Component):
"""Landslide probability component using the infinite slope stability
model.
Landlab component designed to calculate probability of failure at
each grid node based on the infinite slope stability model
stability index (Factor of Safety).
The driving force for failure is provided by the user in the form of
groundwater recharge; four options for providing recharge are supported.
The model uses topographic and soil characteristics provided as input
by the user.
The main method of the LandslideProbability class is
`calculate_landslide_probability()``, which calculates the mean soil
relative wetness, probability of soil saturation, and probability of
failure at each node based on a Monte Carlo simulation.
**Usage:**
Option 1 - Uniform recharge
.. code-block:: python
LandslideProbability(
grid,
number_of_iterations=250,
groundwater__recharge_distribution="uniform",
groundwater__recharge_min_value=5.0,
groundwater__recharge_max_value=121.0,
)
Option 2 - Lognormal recharge
.. code-block:: python
LandslideProbability(
grid,
number_of_iterations=250,
groundwater__recharge_distribution="lognormal",
groundwater__recharge_mean=30.0,
groundwater__recharge_standard_deviation=0.25,
)
Option 3 - Lognormal_spatial recharge
.. code-block:: python
LandslideProbability(
grid,
number_of_iterations=250,
groundwater__recharge_distribution="lognormal_spatial",
groundwater__recharge_mean=np.random.randint(20, 120, grid_size),
groundwater__recharge_standard_deviation=np.random.rand(grid_size),
)
Option 4 - Data_driven_spatial recharge
.. code-block:: python
LandslideProbability(
grid,
number_of_iterations=250,
groundwater__recharge_distribution="data_driven_spatial",
groundwater__recharge_HSD_inputs=[HSD_dict, HSD_id_dict, fract_dict],
)
Examples
--------
>>> from landlab import RasterModelGrid
>>> from landlab.components.landslides import LandslideProbability
>>> import numpy as np
Create a grid on which to calculate landslide probability.
>>> grid = RasterModelGrid((5, 4), xy_spacing=(0.2, 0.2))
Check the number of core nodes.
>>> grid.number_of_core_nodes
6
The grid will need some input data. To check the names of the fields
that provide the input to this component, use the *input_var_names*
class property.
>>> sorted(LandslideProbability.input_var_names)
['soil__density',
'soil__internal_friction_angle',
'soil__maximum_total_cohesion',
'soil__minimum_total_cohesion',
'soil__mode_total_cohesion',
'soil__saturated_hydraulic_conductivity',
'soil__thickness',
'soil__transmissivity',
'topographic__slope',
'topographic__specific_contributing_area']
Check the units for the fields.
>>> LandslideProbability.var_units("topographic__specific_contributing_area")
'm'
Create an input field.
>>> grid.at_node["topographic__slope"] = np.random.rand(grid.number_of_nodes)
If you are not sure about one of the input or output variables, you can
get help for specific variables.
>>> LandslideProbability.var_help("soil__transmissivity")
name: soil__transmissivity
description:
mode rate of water transmitted through a unit width of saturated
soil - either provided or calculated with Ksat and soil depth
units: m2/day
unit agnostic: False
at: node
intent: in
Additional required fields for component.
>>> scatter_dat = np.random.randint(1, 10, grid.number_of_nodes)
>>> grid.at_node["topographic__specific_contributing_area"] = np.sort(
... np.random.randint(30, 900, grid.number_of_nodes).astype(float)
... )
>>> grid.at_node["soil__transmissivity"] = np.sort(
... np.random.randint(5, 20, grid.number_of_nodes).astype(float), -1
... )
>>> grid.at_node["soil__saturated_hydraulic_conductivity"] = np.sort(
... np.random.randint(2, 10, grid.number_of_nodes).astype(float), -1
... )
>>> grid.at_node["soil__mode_total_cohesion"] = np.sort(
... np.random.randint(30, 900, grid.number_of_nodes).astype(float)
... )
>>> grid.at_node["soil__minimum_total_cohesion"] = (
... grid.at_node["soil__mode_total_cohesion"] - scatter_dat
... )
>>> grid.at_node["soil__maximum_total_cohesion"] = (
... grid.at_node["soil__mode_total_cohesion"] + scatter_dat
... )
>>> grid.at_node["soil__internal_friction_angle"] = np.sort(
... np.random.randint(26, 40, grid.number_of_nodes).astype(float)
... )
>>> grid.at_node["soil__thickness"] = np.sort(
... np.random.randint(1, 10, grid.number_of_nodes).astype(float)
... )
>>> grid.at_node["soil__density"] = 2000.0 * np.ones(grid.number_of_nodes)
Instantiate the 'LandslideProbability' component to work on this grid,
and run it.
>>> ls_prob = LandslideProbability(grid)
>>> np.allclose(grid.at_node["landslide__probability_of_failure"], 0.0)
True
Run the *calculate_landslide_probability* method to update output
variables with grid
>>> ls_prob.calculate_landslide_probability()
Check the output variable names.
>>> sorted(ls_prob.output_var_names)
['landslide__probability_of_failure',
'soil__mean_relative_wetness',
'soil__probability_of_saturation']
Check the output from the component, including array at one node.
>>> np.allclose(grid.at_node["landslide__probability_of_failure"], 0.0)
False
>>> core_nodes = ls_prob.grid.core_nodes
References
----------
**Required Software Citation(s) Specific to this Component**
Strauch, R., Istanbulluoglu, E., Nudurupati, S., Bandaragoda, C.,
Gasparini, N., Tucker, G. (2018). A hydroclimatological approach to
predicting regional landslide probability using Landlab Earth Surface
Dynamics 6(1), 49-75. https://dx.doi.org/10.5194/esurf-6-49-2018
**Additional References**
None Listed
"""
# component name
_name = "Landslide Probability"
_unit_agnostic = False
__version__ = "1.0"
_cite_as = """
@article{strauch2018hydroclimatological,
author = {Strauch, Ronda and Istanbulluoglu, Erkan and Nudurupati,
Sai Siddhartha and Bandaragoda, Christina and Gasparini, Nicole M and
Tucker, Gregory E},
title = {{A hydroclimatological approach to predicting regional landslide
probability using Landlab}},
issn = {2196-6311},
doi = {10.5194/esurf-6-49-2018},
pages = {49--75},
number = {1},
volume = {6},
journal = {Earth Surface Dynamics},
year = {2018}
}
"""
_info = {
"landslide__probability_of_failure": {
"dtype": float,
"intent": "out",
"optional": False,
"units": "None",
"mapping": "node",
"doc": "number of times FS is <=1 out of number of iterations user selected",
},
"soil__density": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "kg/m3",
"mapping": "node",
"doc": "wet bulk density of soil",
},
"soil__internal_friction_angle": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "degrees",
"mapping": "node",
"doc": "critical angle just before failure due to friction between particles",
},
"soil__maximum_total_cohesion": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "Pa or kg/m-s2",
"mapping": "node",
"doc": "maximum of combined root and soil cohesion at node",
},
"soil__mean_relative_wetness": {
"dtype": float,
"intent": "out",
"optional": False,
"units": "None",
"mapping": "node",
"doc": (
"Indicator of soil wetness; relative depth perched water table "
"within the soil layer"
),
},
"soil__minimum_total_cohesion": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "Pa or kg/m-s2",
"mapping": "node",
"doc": "minimum of combined root and soil cohesion at node",
},
"soil__mode_total_cohesion": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "Pa or kg/m-s2",
"mapping": "node",
"doc": "mode of combined root and soil cohesion at node",
},
"soil__probability_of_saturation": {
"dtype": float,
"intent": "out",
"optional": False,
"units": "None",
"mapping": "node",
"doc": (
"number of times relative wetness is >=1 out of number of "
"iterations user selected"
),
},
"soil__saturated_hydraulic_conductivity": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "m/day",
"mapping": "node",
"doc": (
"mode rate of water transmitted through soil - provided if "
"transmissivity is NOT provided to calculate tranmissivity "
"with soil depth"
),
},
"soil__thickness": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "m",
"mapping": "node",
"doc": "soil depth to restrictive layer",
},
"soil__transmissivity": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "m2/day",
"mapping": "node",
"doc": (
"mode rate of water transmitted through a unit width of saturated "
"soil - either provided or calculated with Ksat and soil depth"
),
},
"topographic__slope": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "tan theta",
"mapping": "node",
"doc": "gradient of the ground surface",
},
"topographic__specific_contributing_area": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "m",
"mapping": "node",
"doc": "specific contributing (upslope area/cell face ) that drains to node",
},
}
[docs]
def __init__(
self,
grid,
number_of_iterations=250,
g=scipy.constants.g,
groundwater__recharge_distribution="uniform",
groundwater__recharge_min_value=20.0,
groundwater__recharge_max_value=120.0,
groundwater__recharge_mean=None,
groundwater__recharge_standard_deviation=None,
groundwater__recharge_HSD_inputs=(),
seed=0,
):
"""
Parameters
----------
grid: RasterModelGrid
A raster grid.
number_of_iterations: int, optional
Number of iterations to run Monte Carlo simulation (default=250).
groundwater__recharge_distribution: str, optional
single word indicating recharge distribution, either 'uniform',
'lognormal', 'lognormal_spatial,' or 'data_driven_spatial'.
(default='uniform')
groundwater__recharge_min_value: float, optional (mm/d)
minium groundwater recharge for 'uniform' (default=20.)
groundwater__recharge_max_value: float, optional (mm/d)
maximum groundwater recharge for 'uniform' (default=120.)
groundwater__recharge_mean: float, optional (mm/d)
mean grounwater recharge for 'lognormal'
and 'lognormal_spatial' (default=None)
groundwater__recharge_standard_deviation: float, optional (mm/d)
standard deviation of grounwater recharge for 'lognormal'
and 'lognormal_spatial' (default=None)
groundwater__recharge_HSD_inputs: list, optional
list of 3 dictionaries in order (default=[]) - HSD_dict
{Hydrologic Source Domain (HSD) keys: recharge numpy array values},
{node IDs keys: list of HSD_Id values}, HSD_fractions {node IDS
keys: list of HSD fractions values} (none)
Note: this input method is a very specific one, and to use this method,
one has to refer Ref 1 & Ref 2 mentioned above, as this set of
inputs require rigorous pre-processing of data.
g: float, optional (m/sec^2)
acceleration due to gravity.
seed: int, optional
seed for random number generation. if seed is assigned any value
other than the default value of zero, it will create different
sequence. To create a certain sequence repititively, use the same
value as input for seed.
"""
# Initialize seeded random number generation
self._seed_generator(seed)
super().__init__(grid)
# Store parameters and do unit conversions
self._n = int(number_of_iterations)
self._g = g
self._groundwater__recharge_distribution = groundwater__recharge_distribution
# Following code will deal with the input distribution and associated
# parameters
# Uniform distribution
if self._groundwater__recharge_distribution == "uniform":
self._recharge_min = groundwater__recharge_min_value
self._recharge_max = groundwater__recharge_max_value
self._Re = np.random.uniform(
self._recharge_min, self._recharge_max, size=self._n
)
self._Re /= 1000.0 # Convert mm to m
# Lognormal Distribution - Uniform in space
elif self._groundwater__recharge_distribution == "lognormal":
assert (
groundwater__recharge_mean is not None
), "Input mean of the distribution!"
assert (
groundwater__recharge_standard_deviation is not None
), "Input standard deviation of the distribution!"
self._recharge_mean = groundwater__recharge_mean
self._recharge_stdev = groundwater__recharge_standard_deviation
self._mu_lognormal = np.log(
(self._recharge_mean**2)
/ np.sqrt(self._recharge_stdev**2 + self._recharge_mean**2)
)
self._sigma_lognormal = np.sqrt(
np.log((self._recharge_stdev**2) / (self._recharge_mean**2) + 1)
)
self._Re = np.random.lognormal(
self._mu_lognormal, self._sigma_lognormal, self._n
)
self._Re /= 1000.0 # Convert mm to m
# Lognormal Distribution - Variable in space
elif self._groundwater__recharge_distribution == "lognormal_spatial":
assert groundwater__recharge_mean.shape[0] == (
self._grid.number_of_nodes
), "Input array should be of the length of grid.number_of_nodes!"
assert groundwater__recharge_standard_deviation.shape[0] == (
self._grid.number_of_nodes
), "Input array should be of the length of grid.number_of_nodes!"
self._recharge_mean = groundwater__recharge_mean
self._recharge_stdev = groundwater__recharge_standard_deviation
# Custom HSD inputs - Hydrologic Source Domain -> Model Domain
elif self._groundwater__recharge_distribution == "data_driven_spatial":
self._HSD_dict = groundwater__recharge_HSD_inputs[0]
self._HSD_id_dict = groundwater__recharge_HSD_inputs[1]
self._fract_dict = groundwater__recharge_HSD_inputs[2]
self._interpolate_HSD_dict()
# Check if all output fields are initialized
self.initialize_output_fields()
# Create a switch to imply whether Ksat is provided.
if np.all(self._grid.at_node["soil__saturated_hydraulic_conductivity"] == 0):
self._Ksat_provided = 0 # False
else:
self._Ksat_provided = 1 # True
self._nodal_values = self._grid.at_node
[docs]
def calculate_factor_of_safety(self, i):
"""Method to calculate factor of safety.
Method calculates factor-of-safety stability index by using
node specific parameters, creating distributions of these parameters,
and calculating the index by sampling these distributions 'n' times.
The index is calculated from the 'infinite slope stabilty
factor-of-safety equation' in the format of Pack RT, Tarboton DG,
and Goodwin CN (1998),The SINMAP approach to terrain stability mapping.
Parameters
----------
i: int
index of core node ID.
"""
# generate distributions to sample from to provide input parameters
# currently triangle distribution using mode, min, & max
self._a = np.float32(
self._grid.at_node["topographic__specific_contributing_area"][i]
)
self._theta = np.float32(self._grid.at_node["topographic__slope"][i])
self._Tmode = np.float32(self._grid.at_node["soil__transmissivity"][i])
self._Ksatmode = np.float32(
self._grid.at_node["soil__saturated_hydraulic_conductivity"][i]
)
self._Cmode = np.float32(self._grid.at_node["soil__mode_total_cohesion"][i])
self._Cmin = np.float32(self._grid.at_node["soil__minimum_total_cohesion"][i])
self._Cmax = np.float32(self._grid.at_node["soil__maximum_total_cohesion"][i])
self._phi_mode = np.float32(
self._grid.at_node["soil__internal_friction_angle"][i]
)
self._rho = np.float32(self._grid.at_node["soil__density"][i])
self._hs_mode = np.float32(self._grid.at_node["soil__thickness"][i])
# recharge distribution based on distribution type
if self._groundwater__recharge_distribution == "data_driven_spatial":
self._calculate_HSD_recharge(i)
self._Re /= 1000.0 # mm->m
elif self._groundwater__recharge_distribution == "lognormal_spatial":
mu_lognormal = np.log(
(self._recharge_mean[i] ** 2)
/ np.sqrt(self._recharge_stdev[i] ** 2 + self._recharge_mean[i] ** 2)
)
sigma_lognormal = np.sqrt(
np.log(
(self._recharge_stdev[i] ** 2) / (self._recharge_mean[i] ** 2) + 1
)
)
self._Re = np.random.lognormal(mu_lognormal, sigma_lognormal, self._n)
self._Re /= 1000.0 # Convert mm to m
# Cohesion
# if don't provide fields of min and max C, uncomment 2 lines below
# Cmin = self._Cmode-0.3*self._Cmode
# Cmax = self._Cmode+0.3*self._Cmode
self._C = np.random.triangular(
self._Cmin, self._Cmode, self._Cmax, size=self._n
)
# phi - internal angle of friction provided in degrees
phi_min = self._phi_mode - 0.18 * self._phi_mode
phi_max = self._phi_mode + 0.32 * self._phi_mode
self._phi = np.random.triangular(phi_min, self._phi_mode, phi_max, size=self._n)
# soil thickness
# hs_min = min(0.005, self._hs_mode-0.3*self._hs_mode) # Alternative
hs_min = self._hs_mode - 0.3 * self._hs_mode
hs_max = self._hs_mode + 0.1 * self._hs_mode
self._hs = np.random.triangular(hs_min, self._hs_mode, hs_max, size=self._n)
self._hs[self._hs <= 0.0] = 0.005
if self._Ksat_provided:
# Hydraulic conductivity (Ksat)
Ksatmin = self._Ksatmode - (0.3 * self._Ksatmode)
Ksatmax = self._Ksatmode + (0.1 * self._Ksatmode)
self._Ksat = np.random.triangular(
Ksatmin, self._Ksatmode, Ksatmax, size=self._n
)
self._T = self._Ksat * self._hs
else:
# Transmissivity (T)
Tmin = self._Tmode - (0.3 * self._Tmode)
Tmax = self._Tmode + (0.1 * self._Tmode)
self._T = np.random.triangular(Tmin, self._Tmode, Tmax, size=self._n)
# calculate Factor of Safety for n number of times
# calculate components of FS equation
self._C_dim = self._C / (
self._hs * self._rho * self._g
) # dimensionless cohesion
self._rel_wetness = ((self._Re) / self._T) * (
self._a / np.sin(np.arctan(self._theta))
) # relative wetness
# calculate probability of saturation
countr = 0
for val in self._rel_wetness: # find how many RW values >= 1
if val >= 1.0:
countr = countr + 1 # number with RW values (>=1)
# probability: No. high RW values/total No. of values (n)
self._soil__probability_of_saturation = np.float32(countr) / self._n
# Maximum Rel_wetness = 1.0
np.place(self._rel_wetness, self._rel_wetness > 1, 1.0)
self._soil__mean_relative_wetness = np.mean(self._rel_wetness)
Y = np.tan(np.radians(self._phi)) * (1 - (self._rel_wetness * 0.5))
# convert from degrees; 0.5 = water to soil density ratio
# calculate Factor-of-safety
self._FS = (self._C_dim / np.sin(np.arctan(self._theta))) + (
np.cos(np.arctan(self._theta)) * (Y / np.sin(np.arctan(self._theta)))
)
count = 0
for val in self._FS: # find how many FS values <= 1
if val <= 1.0:
count = count + 1 # number with unstable FS values (<=1)
# probability: No. unstable values/total No. of values (n)
self._landslide__probability_of_failure = np.float32(count) / self._n
[docs]
def calculate_landslide_probability(self):
"""Main method of Landslide Probability class.
Method creates arrays for output variables then loops through
all the core nodes to run the method
'calculate_factor_of_safety.' Output parameters probability of
failure, mean relative wetness, and probability of saturation
are assigned as fields to nodes.
"""
# Create arrays for data with -9999 as default to store output
self._mean_Relative_Wetness = np.full(self._grid.number_of_nodes, -9999.0)
self._prob_fail = np.full(self._grid.number_of_nodes, -9999.0)
self._prob_sat = np.full(self._grid.number_of_nodes, -9999.0)
# Run factor of safety Monte Carlo for all core nodes in domain
# i refers to each core node id
for i in self._grid.core_nodes:
self.calculate_factor_of_safety(i)
# Populate storage arrays with calculated values
self._mean_Relative_Wetness[i] = self._soil__mean_relative_wetness
self._prob_fail[i] = self._landslide__probability_of_failure
self._prob_sat[i] = self._soil__probability_of_saturation
# Values can't be negative
self._mean_Relative_Wetness[self._mean_Relative_Wetness < 0.0] = 0.0
self._prob_fail[self._prob_fail < 0.0] = 0.0
# assign output fields to nodes
self._grid.at_node["soil__mean_relative_wetness"] = self._mean_Relative_Wetness
self._grid.at_node["landslide__probability_of_failure"] = self._prob_fail
self._grid.at_node["soil__probability_of_saturation"] = self._prob_sat
def _seed_generator(self, seed=0):
"""Method to initiate random seed.
Seed the random-number generator. This method will create the
same sequence again by re-seeding with the same value (default
value is zero). To create a sequence other than the default,
assign non-zero value for seed.
"""
np.random.seed(seed)
def _interpolate_HSD_dict(self):
"""Method to extrapolate input data.
This method uses a non-parametric approach to expand the input
recharge array to the length of number of iterations. Output is
a new dictionary of interpolated recharge for each HSD id.
"""
HSD_dict = copy.deepcopy(self._HSD_dict)
# First generate interpolated Re for each HSD grid
Yrand = np.sort(np.random.rand(self._n))
# n random numbers (0 to 1) in a column
for vkey in HSD_dict.keys():
if isinstance(HSD_dict[vkey], int):
continue # loop back up if value is integer (e.g. -9999)
Re_temp = HSD_dict[vkey] # an array of annual Re for 1 HSD grid
Fx = ECDF(Re_temp) # instantiate to get probabilities with Re
Fx_ = Fx(Re_temp) # probability array associated with Re data
# interpolate function based on recharge data & probability
f = interpolate.interp1d(
Fx_, Re_temp, bounds_error=False, fill_value=min(Re_temp)
)
# array of Re interpolated from Yrand probabilities (n count)
Re_interpolated = f(Yrand)
# replace values in HSD_dict with interpolated Re
HSD_dict[vkey] = Re_interpolated
self._interpolated_HSD_dict = HSD_dict
def _calculate_HSD_recharge(self, i):
"""Method to calculate recharge based on upstream fractions.
This method calculates the resultant recharge at node i of the
model domain, using recharge of contributing HSD ids and the
areal fractions of upstream contributing HSD ids. Output is a
numpy array of recharge at node i.
"""
store_Re = np.zeros(self._n)
HSD_id_list = self._HSD_id_dict[i]
fract_list = self._fract_dict[i]
for j in range(0, len(HSD_id_list)):
Re_temp = self._interpolated_HSD_dict[HSD_id_list[j]]
fract_temp = fract_list[j]
Re_adj = Re_temp * fract_temp
store_Re = np.vstack((store_Re, np.array(Re_adj)))
self._Re = np.sum(store_Re, 0)