Source code for landlab.io.esri_ascii

#! /usr/bin/env python
"""Read/write data from an ESRI ASCII file into a RasterModelGrid.

ESRI ASCII functions
++++++++++++++++++++

.. autosummary::

    ~dump
    ~lazy_load
    ~lazy_loads
    ~load
    ~loads
    ~parse
"""
from __future__ import annotations

import io
from collections import OrderedDict
from collections.abc import Callable
from typing import Literal
from typing import NamedTuple
from typing import TextIO
from typing import overload

import numpy as np
from numpy.typing import ArrayLike
from numpy.typing import NDArray

from landlab.grid.raster import RasterModelGrid
from landlab.layers.eventlayers import _valid_keywords_or_raise


[docs] class EsriAsciiError(Exception): pass
[docs] class BadHeaderError(EsriAsciiError):
[docs] def __init__(self, msg: str) -> None: self._msg = msg
def __str__(self) -> str: return self._msg
[docs] def dump( grid: RasterModelGrid, stream: TextIO | None = None, at: Literal["node", "patch", "corner", "cell"] = "node", name: str | None = None, ) -> str | None: """Dump a grid field to ESRII ASCII format. Parameters ---------- grid : A Landlab grid. stream : A ``file_like`` object to write to. If ``None``, return a string containing the serialized grid. at : Where the field to be written is located on the grid. name : The name of the field to be written. If ``None``, only the header information will be written. Returns ------- : The grid field in ESRI ASCII format or ``None`` if a ``file_like`` object was provided. Examples -------- >>> from landlab import RasterModelGrid >>> from landlab.io import esri_ascii >>> grid = RasterModelGrid((3, 4), xy_spacing=2.0) >>> print(esri_ascii.dump(grid, at="node")) NROWS 3 NCOLS 4 CELLSIZE 2.0 XLLCENTER 0.0 YLLCENTER 0.0 NODATA_VALUE -9999 >>> print(esri_ascii.dump(grid, at="cell")) NROWS 1 NCOLS 2 CELLSIZE 2.0 XLLCENTER 2.0 YLLCENTER 2.0 NODATA_VALUE -9999 """ grid = _validate_grid(grid) shape = np.asarray(grid.shape) if at in ("corner", "patch"): shape -= 1 elif at == "cell": shape -= 2 shift = _get_lower_left_shift(at=at, ref="center") header = _Header( nrows=shape[0], ncols=shape[1], xllcenter=grid.xy_of_lower_left[0] - grid.dx * shift, yllcenter=grid.xy_of_lower_left[1] - grid.dx * shift, cellsize=grid.dx, ) lines = [str(header)] if name: data = getattr(grid, f"at_{at}")[name] kwds = {"comments": ""} if data.dtype.kind in ("i", "u"): kwds["fmt"] = "%d" with io.StringIO() as fp: np.savetxt(fp, np.flipud(data.reshape(shape)), **kwds) lines.append(fp.getvalue()) content = "".join(lines) if stream is None: return content else: stream.write(content) return None
[docs] def load( stream: TextIO, at: str = "node", name: str | None = None, out: RasterModelGrid | None = None, ) -> RasterModelGrid: """Parse a RasterModelGrid from a ESRI ASCII format stream. Parameters ---------- stream : file_like A text stream in ESRI ASCII format. at : {'node', 'patch', 'corner', 'cell'}, optional Location on the grid where data are placed. name : str, optional Name of the newly-created field. If `name` is not provided, the grid will be created but the data not added as a field. out : RasterModelGrid, optional Place the data from the file onto an existing grid. If not provided, create a new grid. Returns ------- : A newly-created ``RasterModelGrid`` with, optionaly, the data added as a field (if `name` was provided). """ return loads(stream.read(), at=at, name=name, out=out)
[docs] def loads( s: str, at: str = "node", name: str | None = None, out: RasterModelGrid | None = None, ) -> RasterModelGrid: """Parse a RasterModelGrid from a ESRI ASCII formatted string. Parameters ---------- s : str A string in ESRI ASCII format. at : {'node', 'patch', 'corner', 'cell'}, optional Location on the grid where data are placed. name : str, optional Name of the newly-created field. If `name` is not provided, the grid will be created but the data not added to the grid as a field. out : RasterModelGrid, optional Place the data from the file onto an existing grid. If not provided, create a new grid. Returns ------- RasterModelGrid A newly-created ``RasterModelGrid`` with, optionaly, the data added as a field (if `name` was provided). Examples -------- >>> from landlab.io import esri_ascii >>> contents = ''' ... NROWS 1 ... NCOLS 2 ... XLLCORNER -2.0 ... YLLCORNER 4.0 ... CELLSIZE 2.0 ... NODATA_VALUE -9999 ... '''.lstrip() >>> grid = esri_ascii.loads(contents, at="cell") >>> grid.shape (3, 4) >>> grid.xy_of_lower_left (-3.0, 3.0) >>> grid.spacing (2.0, 2.0) >>> contents = ''' ... NROWS 1 ... NCOLS 2 ... XLLCENTER -2.0 ... YLLCENTER 4.0 ... CELLSIZE 2.0 ... NODATA_VALUE -9999 ... 10 11 ... '''.lstrip() >>> grid = esri_ascii.loads(contents, at="cell", name="foo") >>> grid.shape (3, 4) >>> grid.xy_of_lower_left (-4.0, 2.0) >>> grid.spacing (2.0, 2.0) >>> grid.at_cell["foo"] array([10., 11.]) >>> contents = ''' ... NROWS 3 ... NCOLS 4 ... XLLCENTER 0.0 ... YLLCENTER 0.0 ... CELLSIZE 1.0 ... NODATA_VALUE -9999 ... 1 2 3 4 ... 5 6 7 8 ... 9 10 11 12 ... '''.lstrip() >>> grid = esri_ascii.loads(contents, at="node", name="foo") >>> contents = ''' ... NROWS 1 ... NCOLS 2 ... XLLCENTER 1.0 ... YLLCENTER 1.0 ... CELLSIZE 1.0 ... NODATA_VALUE -9999 ... 10 20 ... '''.lstrip() >>> grid = esri_ascii.loads(contents, at="cell", name="foo", out=grid) >>> grid.at_node["foo"].reshape(grid.shape) array([[ 9., 10., 11., 12.], [ 5., 6., 7., 8.], [ 1., 2., 3., 4.]]) >>> grid.at_cell["foo"] array([10., 20.]) """ if name is None: info = parse(s, with_data=False) else: info, data = parse(s, with_data=True) header = _Header(**info) shape = np.asarray(header.shape) if at in ("corner", "patch"): shape += 1 elif at == "cell": shape += 2 shift = header.cell_size * _get_lower_left_shift(at=at, ref=header.lower_left_ref) xy_of_lower_left = np.asarray(header.lower_left) + shift if out is None: grid = RasterModelGrid( shape, xy_spacing=header.cell_size, xy_of_lower_left=xy_of_lower_left ) else: grid = _validate_grid( out, shape, xy_spacing=header.cell_size, xy_of_lower_left=xy_of_lower_left ) if name is not None: getattr(grid, f"at_{at}")[name] = data return grid
def _header_to_grid_args(info: dict[str, int | float], at: str): header = _Header(**info) shape = np.asarray(header.shape) if at in ("corner", "patch"): shape += 1 elif at == "cell": shape += 2 shift = header.cell_size * _get_lower_left_shift(at=at, ref=header.lower_left_ref) # xy_of_lower_left = np.asarray(header.lower_left) + shift return { "shape": tuple(shape), "xy_spacing": (header.cell_size, header.cell_size), "xy_of_lower_left": tuple(np.asarray(header.lower_left) + shift), }
[docs] class RasterGridArgs(NamedTuple): shape: tuple[int, int] xy_spacing: tuple[float, float] xy_of_lower_left: tuple[float, float]
@overload def lazy_load(stream: TextIO, at: str = "node", name: None = ...) -> RasterGridArgs: ... @overload def lazy_load( stream: TextIO, at: str = "node", name: str = ... ) -> tuple[RasterGridArgs, NDArray]: ...
[docs] def lazy_load( stream: TextIO, at: str = "node", name: str | None = None, ) -> RasterGridArgs | tuple[RasterGridArgs, NDArray]: """Parse a RasterModelGrid from a ESRI ASCII format stream. Parameters ---------- stream : file_like A text stream in ESRI ASCII format. at : {'node', 'patch', 'corner', 'cell'}, optional Location on the grid where data are placed. name : str, optional Name of the newly-created field. If `name` is not provided, the grid will be created but the data not added as a field. Returns ------- RasterGridArgs The header metadata NDArray, optional The data as a numpy array. """ return lazy_loads(stream.read(), at=at, name=name)
@overload def lazy_loads(s: str, at: str = "node", name: None = ...) -> RasterGridArgs: ... @overload def lazy_loads( s: str, at: str = "node", name: str = ... ) -> tuple[RasterGridArgs, NDArray]: ...
[docs] def lazy_loads( s: str, at: str = "node", name: str | None = None, out: RasterModelGrid | None = None, ) -> RasterGridArgs | tuple[RasterGridArgs, NDArray]: """Parse a ESRI ASCII formatted string. Parameters ---------- s : str A string in ESRI ASCII format. at : {'node', 'patch', 'corner', 'cell'}, optional Location on the grid where data are placed. name : str, optional Name of the newly-created field. If `name` is not provided, the grid will be created but the data not added to the grid as a field. out : RasterModelGrid, optional Place the data from the file onto an existing grid. If not provided, create a new grid. Returns ------- RasterGridArgs The header metadata NDArray, optional The data as a numpy array. Examples -------- >>> from landlab import RasterModelGrid >>> from landlab.io import esri_ascii >>> contents = ''' ... NROWS 1 ... NCOLS 2 ... XLLCORNER -2.0 ... YLLCORNER 4.0 ... CELLSIZE 2.0 ... NODATA_VALUE -9999 ... 10 20 ... '''.lstrip() >>> args, data = esri_ascii.lazy_loads(contents, at="cell", name="foo") >>> args RasterGridArgs(shape=(3, 4), xy_spacing=(2.0, 2.0), xy_of_lower_left=(-3.0, 3.0)) >>> data array([10., 20.]) >>> grid = RasterModelGrid(*args) >>> grid.at_cell["foo"] = data >>> contents = ''' ... NROWS 3 ... NCOLS 4 ... XLLCORNER -3.0 ... YLLCORNER 3.0 ... CELLSIZE 2.0 ... NODATA_VALUE -9999 ... 1 2 3 4 ... 5 6 7 8 ... 9 10 11 12 ... '''.lstrip() >>> args, data = esri_ascii.lazy_loads(contents, at="node", name="foo", out=grid) >>> args RasterGridArgs(shape=(3, 4), xy_spacing=(2.0, 2.0), xy_of_lower_left=(-3.0, 3.0)) >>> data array([ 9., 10., 11., 12., 5., 6., 7., 8., 1., 2., 3., 4.]) >>> grid.at_cell["foo"] array([10., 20.]) >>> grid.at_node["foo"] array([ 9., 10., 11., 12., 5., 6., 7., 8., 1., 2., 3., 4.]) """ if name is None: info = parse(s, with_data=False) else: info, data = parse(s, with_data=True) args = RasterGridArgs(**_header_to_grid_args(info, at=at)) if name is not None: if out is not None: grid = _validate_grid(out, *args) getattr(grid, f"at_{at}")[name] = data return args, data else: return args
@overload def parse(s: str, with_data: Literal[False] = ...) -> dict[str, int | float]: ... @overload def parse( s: str, with_data: Literal[True] = ... ) -> tuple[dict[str, int | float], NDArray]: ...
[docs] def parse( s: str, with_data: bool = False ) -> dict[str, int | float] | tuple[dict[str, int | float], NDArray]: """Parse a ESRI ASCII formatted string. Parameters ---------- s : str String to parse. with_data : bool, optional Return the data as a numpy array, otherwise, just return the header. Returns ------- dict or tuple of (dict, ndarray) The header metadata and, optionally, the data. Examples -------- >>> from landlab.io import esri_ascii >>> contents = ''' ... NROWS 2 ... NCOLS 3 ... XLLCORNER -2.0 ... YLLCORNER 4.0 ... CELLSIZE 2.0 ... NODATA_VALUE -9999 ... 10 20 30 ... 40 50 60 ... '''.lstrip() >>> list(esri_ascii.parse(contents).items()) [('nrows', 2), ('ncols', 3), ('xllcorner', -2.0), ('yllcorner', 4.0), ('cellsize', 2.0), ('nodata_value', -9999.0)] >>> info, data = esri_ascii.parse(contents, with_data=True) >>> data array([40., 50., 60., 10., 20., 30.]) """ lines = s.splitlines() start_of_data: int | None = 0 for lineno, line in enumerate(lines): if not _Header.is_header_line(line): start_of_data = lineno break else: start_of_data = None if start_of_data is None: header, body = lines, [] else: header, body = lines[:start_of_data], lines[start_of_data:] if not header: raise EsriAsciiError("missing header") if with_data and not body: raise EsriAsciiError("missing data") info = OrderedDict(_Header.parse_header_line(line) for line in header) validated_info = _Header(**info) if with_data: data = np.loadtxt([" ".join(body)]).reshape(validated_info.shape) return info, np.flipud(data).reshape((-1,)) else: return info
def _validate_grid( grid: RasterModelGrid, shape: ArrayLike | None = None, xy_spacing: float | tuple[float, float] | None = None, xy_of_lower_left: tuple[float, float] | None = None, ) -> RasterModelGrid: if not isinstance(grid, RasterModelGrid): raise EsriAsciiError( "Not a RasterModelGrid. Only RasterModelGrids can be expressed in" " ESRI ASCII format." ) if grid.dx != grid.dy: raise EsriAsciiError(f"x and y spacing must be equal ({grid.dx} != {grid.dy}).") if shape is not None and not np.all(np.equal(grid.shape, shape)): raise EsriAsciiError(f"Grid shape mismatch ({grid.shape} != {shape}).") if xy_spacing is not None and not np.all(np.equal(grid.spacing, xy_spacing)): raise EsriAsciiError(f"Grid spacing mismatch ({grid.dx} != {xy_spacing}).") if xy_of_lower_left is not None and not np.all( np.equal(grid.xy_of_lower_left, xy_of_lower_left) ): raise EsriAsciiError( f"Grid lower-left mismatch ({grid.xy_of_lower_left} != {xy_of_lower_left})." ) return grid def _get_lower_left_shift(at="node", ref="center"): """Calculate the shift from the lower left of a grid to ESRI ASCII. Parameters ---------- at : {'node', 'patch', 'corner', 'cell'}, optional Grid location where data are placed. ref : {'center', 'corner'}, optional Reference location with respect to the ESRI ASCII cell. Returns ------- float The unit shift between the lower left of an ESRI ASCII field and a ``RasterModelGrid``. """ if at == "node" or (at == "patch" and ref == "corner"): return 0.0 elif ( at == "corner" or (at == "patch" and ref == "center") or (at == "cell" and ref == "corner") ): return -0.5 elif at == "cell" and ref == "center": return -1.0 raise RuntimeError(f"unreachable code ({at!r}, {ref!r}") class _Header: required_keys = frozenset(("ncols", "nrows", "cellsize")) optional_keys = frozenset( ("xllcorner", "xllcenter", "yllcorner", "yllcenter", "nodata_value") ) def __init__(self, **kwds): _valid_keywords_or_raise( kwds, required=_Header.required_keys, optional=_Header.optional_keys ) self._shape = self._validate_shape(kwds["nrows"], kwds["ncols"]) self._cell_size = self._validate_cell_size(kwds["cellsize"]) self._nodata_value = kwds.get("nodata_value", -9999) lower_left = { k: kwds[k] for k in ("xllcorner", "xllcenter", "yllcorner", "yllcenter") if k in kwds } self._lower_left, self._lower_left_ref = self._validate_lower_left(**lower_left) @staticmethod def parse_header_line(line: str) -> tuple[str, int | float]: """Parse a header line into a key/value pair.""" try: key, value = line.split(maxsplit=1) except ValueError: raise BadHeaderError( f"header line must contain a key/value pair ({line!r})" ) from None else: key = key.lower() convert: dict[str, Callable[[str], int | float]] = {"nrows": int, "ncols": int} try: return key, convert.get(key, float)(value) except ValueError: raise BadHeaderError( f"unable to convert header value to a number ({line!r})" ) from None @staticmethod def is_header_line(line: str) -> bool: """Check if a string is a possible header line.""" try: key, value = line.split(maxsplit=1) except ValueError: return False else: return key.lower() in (_Header.required_keys | _Header.optional_keys) @property def shape(self) -> tuple[int, int]: return self._shape @staticmethod def _validate_shape(n_rows: int, n_cols: int) -> tuple[int, int]: if not isinstance(n_rows, (int, np.integer)) or n_rows <= 0: raise BadHeaderError(f"n_rows must be a positive integer ({n_rows!r})") if not isinstance(n_cols, (int, np.integer)) or n_cols <= 0: raise BadHeaderError(f"n_cols must be a positive integer ({n_cols!r})") return n_rows, n_cols @property def lower_left(self) -> tuple[float, float]: return self._lower_left @property def lower_left_ref(self) -> str: return self._lower_left_ref @staticmethod def _validate_lower_left(cell_size=0.0, **kwds) -> tuple[tuple[float, float], str]: if set(kwds) == {"xllcorner", "yllcorner"}: lower_left = kwds["xllcorner"], kwds["yllcorner"] ref = "corner" elif set(kwds) == {"xllcenter", "yllcenter"}: lower_left = kwds["xllcenter"], kwds["yllcenter"] ref = "center" else: raise BadHeaderError( "header must contain one, and only one, of the pairs" " ('xllcenter', 'yllcenter') or ('xllcorner', 'yllcorner')" f" (got {', '.join(repr(k) for k in sorted(set(kwds)))})" ) return lower_left, ref @property def cell_size(self) -> float: return self._cell_size @staticmethod def _validate_cell_size(cell_size: float) -> float: if cell_size <= 0.0: raise BadHeaderError(f"cell size must be greater than zero ({cell_size})") return float(cell_size) @property def nodata(self) -> int | float: return self._nodata_value def __str__(self) -> str: lines = ( f"NROWS {self.shape[0]}", f"NCOLS {self.shape[1]}", f"CELLSIZE {self.cell_size}", f"XLL{self.lower_left_ref.upper()} {self.lower_left[0]}", f"YLL{self.lower_left_ref.upper()} {self.lower_left[1]}", f"NODATA_VALUE {self.nodata}", ) return "\n".join(lines) + "\n" def __repr__(self) -> str: kwds = { "nrows": self.shape[0], "ncols": self.shape[1], "cellsize": self.cell_size, f"xll{self.lower_left_ref}": self.lower_left[0], f"yll{self.lower_left_ref}": self.lower_left[1], "nodata_value": self.nodata, } args = [f"{k}={v!r}" for k, v in kwds.items()] return f"_Header({', '.join(args)})"