Source code for landlab.components.species_evolution.zone

#!/usr/bin/env python
"""Zone functions and class of SpeciesEvolver."""
from collections import OrderedDict
from enum import IntEnum
from enum import unique

import numpy as np
from pandas import Series


[docs] @unique class Connection(IntEnum): """Zone connection type. Connection type represents the connectivity of zones in two time steps. It is described with the pattern, x-to-y where x and y are the descriptive counts (none, one, or many) of zone(s) at the earlier and later time step, respectively. For example, a connection type of one-to-many is a zone in the earlier time step that spatially overlaps multiple zones in the later time step. """ NONE_TO_NONE = 0 NONE_TO_ONE = 1 ONE_TO_NONE = 2 ONE_TO_ONE = 3 ONE_TO_MANY = 4 MANY_TO_ONE = 5 MANY_TO_MANY = 6
def _update_zones(grid, prior_zones, new_zones, record): """Resolve the spatial connectivity of zones across two time steps. This method iterates over each zone of the prior time step to identify the zones of the current time step it spatially intersects. This method updates the zone attribute, ``successors``. Successor zones are the new zones existing at the current time step that are the continuation of prior time step zones referred to as the predecessor zones. The type of connection between predecessor and successor zones is described ``Connection``. The `_conn_type` property of zones are set by this method. In the `many` connections, a rule determines which of the prior zones persist as the zone in the current time step. The zone with the greatest area of intersection between the prior and current time steps persists to the current time step along with the others in `new_zones`. Parameters ---------- grid : ModelGrid A Landlab ModelGrid. prior_zones : Zone list The zones of the prior timestep. new_zones : Zone list The zones of the current timestep. record : Record The ZoneController record. Returns ------- list of Zones The successor zones. The zones determined to exist at the current time step with an updated ``successors`` attribute. """ # Stats are calculated for `record`. fragment_ct = 0 capture_ct = 0 area_captured = [0] # Get `successors`. if len(prior_zones) == 0 and len(new_zones) == 0: successors = [] elif len(prior_zones) == 0 and len(new_zones) > 0: successors = new_zones conn_type = _determine_connection_type(0, 1) for n in new_zones: n._conn_type = conn_type n._successors = [n] elif len(prior_zones) > 0 and len(new_zones) == 0: successors = [] conn_type = _determine_connection_type(1, 0) for p in prior_zones: p._conn_type = conn_type p._successors = [] elif len(prior_zones) > 0 and len(new_zones) > 0: successors = [] # Get index maps for the prior zones (`ps`) and new zones (`ns`). ps_index_map = _create_index_map(grid, prior_zones) ns_index_map = _create_index_map(grid, new_zones) replacements = OrderedDict() for i_p, p in enumerate(prior_zones): # Retain a copy of the prior zone mask to compare with the new zone # mask after the zone masks are updated. p_mask_copy = p.mask.copy() # Get the new zones that intersect (`i`) the prior zone. ns_i_p = _intersecting_zones(ps_index_map == i_p, ns_index_map, new_zones) ns_i_p_ct = len(ns_i_p) # Get the other prior zones that intersect the new zones. ns_mask = np.any([n.mask for n in ns_i_p], axis=0) ps_i_ns = _intersecting_zones(ns_mask, ps_index_map, prior_zones) ps_i_ns_ct = len(ps_i_ns) if ps_i_ns_ct == 0: ps_i_ns_ct = 1 ps_i_ns = p # Get successors depending on connection type. conn_type = _determine_connection_type(ps_i_ns_ct, ns_i_p_ct) p_successors = _get_successors( p, conn_type, ps_i_ns, ns_i_p, prior_zones, ps_index_map, replacements, successors, ) # Update statistics. if conn_type in [Connection.MANY_TO_ONE, Connection.MANY_TO_MANY]: # Copy successors to be `captured_zones`. captured_zones = list(p_successors) if p in captured_zones: captured_zones.remove(p) capture_ct += len(captured_zones) for z in captured_zones: captured_mask = np.all([~p_mask_copy, z.mask], 0) area = grid.cell_area_at_node[captured_mask.flatten()].sum() area_captured.append(area) elif conn_type in [Connection.ONE_TO_MANY, Connection.MANY_TO_MANY]: fragment_ct += ns_i_p_ct # Set connection. p._conn_type = conn_type p._successors = p_successors successors.extend(p_successors) for key, value in replacements.items(): key._mask = value.mask # Get unique list of successors, preserving order. successors = Series(successors).drop_duplicates().tolist() # Update the record. record.increment_value("fragmentations", fragment_ct) record.increment_value("captures", capture_ct) record.increment_value("area_captured_sum", sum(area_captured)) old_value = record.get_value("area_captured_max") old_value_is_nan = np.isnan(old_value) new_value_is_greater = max(area_captured) > old_value if old_value_is_nan or new_value_is_greater: record.set_value("area_captured_max", max(area_captured)) return successors def _create_index_map(grid, zones): i = np.where([z.mask for z in zones]) index_map = np.zeros(grid.number_of_nodes, dtype=int) index_map[:] = -1 index_map[i[1]] = i[0] return index_map def _intersecting_zones(condition, zone_index_map, zone_list): mask = np.all([condition, zone_index_map > -1], 0) indices = np.unique(zone_index_map[np.argwhere(mask)]) zones = [zone_list[i] for i in indices] return zones def _determine_connection_type(prior_zone_count, new_zone_count): """Get the connection type based on the count of prior and new zones.""" if prior_zone_count == 0: if new_zone_count == 1: return Connection.NONE_TO_ONE elif prior_zone_count == 1: if new_zone_count == 0: return Connection.ONE_TO_NONE elif new_zone_count == 1: return Connection.ONE_TO_ONE elif new_zone_count > 1: return Connection.ONE_TO_MANY elif prior_zone_count > 1: if new_zone_count == 1: return Connection.MANY_TO_ONE elif new_zone_count > 1: return Connection.MANY_TO_MANY def _get_successors( p, conn_type, ps_i_ns, ns_i_p, prior_zones, ps_index_map, replacements, all_successors, ): if conn_type == Connection.ONE_TO_NONE: successors = [] elif conn_type == Connection.ONE_TO_ONE: # The prior zone is set as the new zone because only the one new # and the one prior overlap. n = ns_i_p[0] replacements[p] = n successors = [p] elif conn_type in [Connection.ONE_TO_MANY, Connection.MANY_TO_MANY]: # Set the successors to the new zones that overlap p. # Although, replace the dominant n with p. dn = p._get_largest_intersection(ns_i_p, exclusions=list(replacements.values())) successors = [] for n in ns_i_p: dp = n._get_largest_intersection( ps_i_ns, exclusions=list(replacements.keys()) ) if n == dn and n in replacements.values(): d = _get_replacement(replacements, n) successors.append(d) elif n == dn: replacements[dp] = n successors.append(dp) elif n in replacements.values(): d = _get_replacement(replacements, n) successors.append(d) else: successors.append(dp) replacements[dp] = n elif conn_type == Connection.MANY_TO_ONE: # Set the successor to the prior zone that intersects n the most. n = ns_i_p[0] dp = n._get_largest_intersection(ps_i_ns) if p == dp and n in replacements.values(): successors = [_get_replacement(replacements, n)] elif p == dp: successors = [p] replacements[p] = n elif n in replacements.values(): successors = [_get_replacement(replacements, n)] else: successors = [dp] replacements[dp] = n return successors def _get_replacement(replacements, new_zone): for key, value in replacements.items(): if value == new_zone: return key
[docs] class Zone: """Zone object of SpeciesEvolver. The nodes and attributes of the spatial entities that taxa populate. This class is not intended to be managed directly. """
[docs] def __init__(self, controller, mask): """Initialize a zone. Parameters ---------- controller : ZoneController A SpeciesEvolver ZoneController. mask : ndarray The mask of the zone. True elements of this array correspond to the grid nodes of the zone. """ self._controller = controller self._mask = mask.flatten() self._conn_type = None self._successors = []
@property def mask(self): """The mask of the zone.""" return self._mask @property def successors(self): """A list of zones connected to zone at the current time step.""" return self._successors @property def area(self): """Zone area calculated as the sum of cell area at grid nodes.""" area = self._controller._grid.cell_area_at_node[self._mask].sum() return area def _get_largest_intersection(self, zones, exclusions=None): exclusions = set() if exclusions is None else set(exclusions) node_intersection_count = [] for z in zones: if z in exclusions: node_intersection_count.append(-1) else: n = len(np.where(np.all([z.mask, self.mask], 0))[0]) node_intersection_count.append(n) if all(x == -1 for x in node_intersection_count): return self return zones[np.argmax(node_intersection_count)]