ewatercycle_DA.DA

Class of functions to integrate Data Assimilation into eWaterCycle

Note

assumes a 1D grid currently (e.g. in get_state_vector) - not yet tested on distributed models.

Module Contents

ewatercycle_DA.DA.LOADED_METHODS: dict[str, Any]
ewatercycle_DA.DA.KNOWN_WORKING_MODELS_DA: list[str] = ['HBV', 'HBVLocal', 'Lorenz', 'LorenzLocal', 'ParallelisationSleep']
ewatercycle_DA.DA.KNOWN_WORKING_MODELS_DA_HYDROLOGY: list[str] = ['HBV', 'HBVLocal']
ewatercycle_DA.DA.TLAG_MAX = 100
ewatercycle_DA.DA.load_models(loaded_models) dict[str, Any]

Loads models found in user install

Note

To load other local models: .. code-block:: python

from ewatercycle_DA import DA from model import LocalModel

ensemble = DA.Ensemble(N=100) ensemble.loaded_models.update({‘LocalModel’:LocalModel})

The LorenzLocal model is only added in this way as it is likely useful to benchmark DA schemes.

class ewatercycle_DA.DA.Ensemble

Bases: pydantic.BaseModel

Class for running data assimilation in eWaterCycle

Parameters:
  • N – Number of ensemble members

  • dask_config – Dictionary to pass to .. :py:`dask.config.set()` see dask docs Will default to same number of workers and physical processors. Use with care, too many workers will overload the infrastructure.

  • parallel – Whether to run in parallel, will default to True unless local python model which doesn’t really benefit from a parallel implementation in most cases.

  • num_worker_init – How many instances initialize at once. Starting up many docker containers at once can be demanding. Set to 1 for stability. This overrides the value set in the`dask_config` on purpose.

ensemble_method

method used for data assimilation

ensemble_method_name

name of method used for data assimilation (needed for function specific)

ensemble_list

list containing ensembleMembers

observed_variable_name

Name of the observed value: often Q but could be anything

measurement_operator

Function or list of Functions which maps the state vector to the measurement space: i.e. extracts the wanted value for comparison by the DA scheme from the state vector. (Also known as H)

observations

NetCDF file containing observations

lst_models_name

list containing a set of all the model names: i.e. to run checks

logger

list to debug issues, this isn’t the best way to debug/log but works for me/now

config_specific_storage

used by the config_specific_actions

Note

Run setup and initialize before using other functions

N: int
dask_config: dict
parallel: bool = True
num_worker_init: int = 1
ensemble_list: list = []
ensemble_method: Any | None
ensemble_method_name: str | None
observed_variable_name: str | None
measurement_operator: Any | list | None
observations: Any | None
lst_models_name: list = []
logger: list = []
config_specific_storage: Any | None
loaded_models: dict[str, Any]
loaded_models
setup() None

Creates a set of empty Ensemble member instances This allows further customisation: i.e. different models in one ensemble

generate_forcing(forcing_class: list[Any] | Any, list_forcing_args: list[dict]) list[Any]

Generates forcing in parallel.

Parameters:
  • forcing_class (list | Any) – (list of) forcing class instance(s). If only one is specified, will pass the different args to same class.

  • list_forcing_args (list[dict]) – list of arguments to be passed to the forcing class(es), should al be different.

Example

e.g. to investigate the impact of varying a parameter:

ensemble = DA.Ensemble(N=n_particles)
ensemble.setup()

experiment_start_date = "1997-08-01T00:00:00Z"
experiment_end_date = "1999-03-01T00:00:00Z"
HRU_id = 14138900

forcing_args = []
for i in range(1,11):
    forcing_args.append(dict(
        start_time = experiment_start_date,
        end_time = experiment_end_date,
        directory = forcing_path,
        camels_file = f'0{HRU_id}_lump_cida_forcing_leap.txt',
        alpha = 1.23 + (i/100)
                             )
                        )
import ewatercycle.forcing
ensemble.generate_forcing(ewatercycle.forcing.sources.HBVForcing,
                              forcing_args
                             )

This returns 10 different forcing objects in a list which can be passed to ensemble.initialize

check_forcing_input(forcing_class, list_forcing_args) None

Check user input for forcing is correct

static generate_forcing_run(forcing, forcing_args)

Generate forcing given obj & args (with dask).

initialize(model_name, forcing, setup_kwargs, custom_cfg_dir=True) None

Takes empty Ensemble members and launches the model for given ensemble member

Parameters:
  • model_name (str | list) – just takes the modl string name for now, change to more formal config file later? Should you pass a list here, you also need a list of forcing objects & vice versa.

  • forcing (ewatercycle.base.forcing.DefaultForcing | list) – object or list of objects to give to the model. Should you want to vary the forcing, you also need to pass a list of models.

  • setup_kwargs (dict | list) – kwargs dictionary which can be passed as model.setup(**setup_kwargs). UserWarning: Ensure your model saves all kwargs to the config Should you want to vary initial parameters, again all should be a list

  • custom_cfg_dir (Bool)

Note

If you want to pass a list for any one variable, all others should be lists too of the same length.

static gather(*args)
static initialize_run(ensemble, i, custom_cfg_dir)
initialize_da_method(ensemble_method_name: str, hyper_parameters: dict, state_vector_variables: str | list, observation_path: pathlib.Path | None = None, observed_variable_name: str | None = None, measurement_operator: Any | list | None = None)

Similar to initialize but specifically for the data assimilation method

Parameters:
  • ensemble_method_name (str) – name of the data assimilation method for the ensemble

  • hyper_parameters (dict) – dictionary containing hyperparameters for the method, these will vary per method and thus are merely passed on

  • state_vector_variables (Optional[str | list[str]]) –

    can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:

    • ewatercycle-HBV

    Can be a set by passing a list containing strings of variable to include in the state vector.

    Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.

    For example giving half the particle filters more variables which vary than others - see what that does.

  • observation_path (Path) –

    Path to a NetCDF file containing observations. Ensure the time dimension is of type numpy.datetime64[ns] in order to work well with

    `Ensemble.update`

  • observed_variable_name (str) – Name of the observed value: often Q but could be anything

  • measurement_operator (function`| :obj:`list[functions]) – if not specified: by default ‘all’ known parameters, can be a subset of all by passing a list containing strings of variable to include in the state vector. Should you want to vary initial parameters, again all should be a list

Note

The following three are Keyword Args to make the code more flexible: when running the initialize_da_method to set up the ensemble normally these are all needed. They are separate as these aren’t needed if DA is done on the fly.

Keyword Args:

Note

Assumed memory is large enough to hold observations in memory/lazy open with xarray Assumed memory is large enough to hold observations in memory/lazy open with xarray

set_state_vector_variables(state_vector_variables: str | list)

Sets state vector variables

Called by Ensemble.initialize_da_method, but can also be called by user to set up get & set state vector if DA is not used.

Parameters:

state_vector_variables (Optional[str | list[str]]) –

can be set to ‘all’ for known parameters, this is highly model and scenario specific

& should be implemented separately. Currently known to work for:

  • ewatercycle-HBV

Can be a set by passing a list containing strings of variable to include in the state vector.

Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.

For example giving half the particle filters more variables which vary than others - see what that does.

static initialize_da_method_run(ensemble, state_vector_variables, i)
finalize(remove_config=True) None

Runs finalize step for all members

Optional Arg:
remove_config (Bool) = True: removes created config paths on finalizing by default.

set to false in case you wish to keep.

static finalize_run(ensemble, i, remove_config)
update(assimilate=False) None

Updates model for all members. :param assimilate: Whether to assimilate in a given timestep. True by default. :type assimilate: bool

Algorithm flow:

Gets the state vector, modeled outcome and corresponding observation

Computes new state vector using supplied method

Then set the new state vector

Currently assumed 1D: only one observation per timestep converted to float

Todo: think about assimilation windows not being every timestep

static update_run(ensemble, i)
assimilate(ensemble_method_name: str, obs: numpy.ndarray, measurement_operator, hyper_parameters: dict, state_vector_variables: str | list | None)

Similar to calling .. py:function:: Ensemble.update(assimilate=True) Intended for advanced users! The assimilate class aims to make on the fly data assimilation possible. You only need to define which method, observations and H operator you wish to use. This however requires more know-how of the situation,

Parameters:
  • ensemble_method_name (str) – name of the data assimilation method for the ensemble

  • obs (np.ndarray) – array of observations for given timestep.

  • measurement_operator (function`| :obj:`list[functions]) – maps

  • hyper_parameters (dict) – dictionary of hyperparameters to set to DA method

  • state_vector_variables (str | :obj:`list[str]`| None) –

    can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:

    • ewatercycle-HBV

    Can be a set by passing a list containing strings of variable to include in the state vector.

    Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.

    For example giving half the particle filters more variables which vary than others - see what that does.

    set to None is called from .. py:function: Ensemble.update(assimilate=true)

get_predicted_values(measurement_operator) numpy.ndarray

“Loops over the state vectors and applies specified measurement operator to obtain predicted value

remove_negative()

if only one model is loaded & hydrological: sets negative numbers to positive Other models such as the lorenz model can be negative

config_specific_actions(pre_set_state)

Function for actions which are specific to a combination of model with method.

Note

Be specific when these are used to only occur when wanted

#1: PF & HBV:

Particle filters replace the full particle: thus the lag function also needs to be copied.

If only HBV models are implemented with PF this will be updates

if HBV and other models are implemented, this will present a RuntimeWarning.

If other models are implemented with PF, nothing should happen, just a UserWarning so that you’re aware.

get_value(var_name: str) numpy.ndarray

Gets current value of whole ensemble for given variable ### currently assumes 2d, fix for 1d:

static get_value_run(ensemble, var_name, i)
get_state_vector() numpy.ndarray

Gets current value of whole ensemble for specified state vector .. note:: Assumes 1d array? although np.vstack does work for 2d arrays

static get_state_vector_run(ensemble, i)
set_value(var_name: str, src: numpy.ndarray) None

Sets current value of whole ensemble for given variable :param src: size = number of ensemble members x 1 [N x 1] :type src: np.ndarray

static set_value_run(ensemble, var_name, src_i, i)
set_state_vector(src: numpy.ndarray) None

Sets current value of whole ensemble for specified state vector

Parameters:

src (np.ndarray) – size = number of ensemble members x number of states in state vector [N x len(z)] src[0] should return the state vector for the first value

static set_state_vector_run(ensemble, src_i, i)
static load_netcdf(observation_path: pathlib.Path, observed_variable_name: str) xarray.DataArray

Load the observation data file supplied by user

class ewatercycle_DA.DA.EnsembleMember

Bases: pydantic.BaseModel

Class containing ensemble members, meant to be called by the DA.Ensemble class

Parameters:
  • model_name (str | list[str]) – just takes the modl string name for now, change to more formal config file later? Should you pass a list here, you also need a list of forcing objects & vice versa.

  • forcing (ewatercycle.base.forcing.DefaultForcing | list) – object or list of objects to give to the model. Should you want to vary the forcing, you also need to pass a list of models.

  • setup_kwargs (dict | list[dict]) – kwargs dictionary which can be passed as model.setup(**setup_kwargs). UserWarning: Ensure your model saves all kwargs to the config Should you want to vary initial parameters, again all should be a list

  • state_vector_variables (Optional[str | list[str]]) –

    can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:

    • ewatercycle-HBV

    Can be a set by passing a list containing strings of variable to include in the state vector. Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters. For example giving half the particle filters more variables which vary than others - see what that does. TODO: refactor to be more on the fly

model

instance of eWaterCycle model to be used. Must be defined in loaded_models dictionary in this file which is a safeguard against misuse.

Type:

ewatercycle.base.model

config

path to config file for the model which the EnsembleMember contains.

Type:

Path

state_vector

numpy array containing last states which were gotten

Type:

np.ndarray

variable_names

list of string containing the variables in the state vector.

Type:

list[str]

loaded_models

dictionary containing model names and their corresponding instances

Type:

dict[str, Any]

model_name: str | None
forcing: ewatercycle.base.forcing.DefaultForcing | None
setup_kwargs: dict | None
state_vector_variables: str | list | None
model: Any | None
config: str | None
cfg_dir: pathlib.Path | None
state_vector: Any | None
variable_names: list[str] | None
loaded_models: dict[str, Any]
setup(cfg_dir) None

Setups the model provided with forcing and kwargs. Set the config file

initialize() None

Initializes the model with the config file generated in setup

set_state_vector_variable()

“Set the list of variables required to obtain the state vector

get_value(var_name: str) numpy.ndarray

gets current value of an ensemble member

get_state_vector() numpy.ndarray

Gets current state vector of ensemble member Note: assumed a 1D grid currently as state_vector is 1D array. Now check the shape of data and variables.

set_value(var_name: str, src: numpy.ndarray) None

Sets current value of an ensemble member

set_state_vector(src: numpy.ndarray) None

Sets current state vector of ensemble member TODO: check this Note: assumes a 1D grid currently as state_vector is 1D array.

finalize(remove_config) None

“Finalizes the model: closing containers etc. if necessary

update() None

Updates the model to the next timestep

verify_model_loaded() None

Checks whether specified model is available.

ewatercycle_DA.DA.validate_method(method)

“Checks uses supplied method to ensure

ewatercycle_DA.DA.validity_initialize_input(model_name, forcing, setup_kwargs) None

Checks user input to avoid confusion: if model_name is a list, all others must be too.