ewatercycle_DA.DA
Class of functions to integrate Data Assimilation into eWaterCycle
Note
assumes a 1D grid currently (e.g. in get_state_vector) - not yet tested on distributed models.
Module Contents
- ewatercycle_DA.DA.LOADED_METHODS: dict[str, Any]
- ewatercycle_DA.DA.KNOWN_WORKING_MODELS_DA: list[str] = ['HBV', 'HBVLocal', 'Lorenz', 'LorenzLocal', 'ParallelisationSleep']
- ewatercycle_DA.DA.KNOWN_WORKING_MODELS_DA_HYDROLOGY: list[str] = ['HBV', 'HBVLocal']
- ewatercycle_DA.DA.TLAG_MAX = 100
- ewatercycle_DA.DA.load_models(loaded_models) dict[str, Any]
Loads models found in user install
Note
To load other local models: .. code-block:: python
from ewatercycle_DA import DA from model import LocalModel
ensemble = DA.Ensemble(N=100) ensemble.loaded_models.update({‘LocalModel’:LocalModel})
The LorenzLocal model is only added in this way as it is likely useful to benchmark DA schemes.
- class ewatercycle_DA.DA.Ensemble
Bases:
pydantic.BaseModelClass for running data assimilation in eWaterCycle
- Parameters:
N – Number of ensemble members
dask_config – Dictionary to pass to .. :py:`dask.config.set()` see dask docs Will default to same number of workers and physical processors. Use with care, too many workers will overload the infrastructure.
parallel – Whether to run in parallel, will default to True unless local python model which doesn’t really benefit from a parallel implementation in most cases.
num_worker_init – How many instances initialize at once. Starting up many docker containers at once can be demanding. Set to 1 for stability. This overrides the value set in the`dask_config` on purpose.
- ensemble_method
method used for data assimilation
- ensemble_method_name
name of method used for data assimilation (needed for function specific)
- ensemble_list
list containing ensembleMembers
- observed_variable_name
Name of the observed value: often Q but could be anything
- measurement_operator
Function or list of Functions which maps the state vector to the measurement space: i.e. extracts the wanted value for comparison by the DA scheme from the state vector. (Also known as H)
- observations
NetCDF file containing observations
- lst_models_name
list containing a set of all the model names: i.e. to run checks
- logger
list to debug issues, this isn’t the best way to debug/log but works for me/now
- config_specific_storage
used by the config_specific_actions
Note
Run
setupandinitializebefore using other functions- N: int
- dask_config: dict
- parallel: bool = True
- num_worker_init: int = 1
- ensemble_list: list = []
- ensemble_method: Any | None
- ensemble_method_name: str | None
- observed_variable_name: str | None
- measurement_operator: Any | list | None
- observations: Any | None
- lst_models_name: list = []
- logger: list = []
- config_specific_storage: Any | None
- loaded_models: dict[str, Any]
- loaded_models
- setup() None
Creates a set of empty Ensemble member instances This allows further customisation: i.e. different models in one ensemble
- generate_forcing(forcing_class: list[Any] | Any, list_forcing_args: list[dict]) list[Any]
Generates forcing in parallel.
- Parameters:
forcing_class (list | Any) – (list of) forcing class instance(s). If only one is specified, will pass the different args to same class.
list_forcing_args (list[dict]) – list of arguments to be passed to the forcing class(es), should al be different.
Example
e.g. to investigate the impact of varying a parameter:
ensemble = DA.Ensemble(N=n_particles) ensemble.setup() experiment_start_date = "1997-08-01T00:00:00Z" experiment_end_date = "1999-03-01T00:00:00Z" HRU_id = 14138900 forcing_args = [] for i in range(1,11): forcing_args.append(dict( start_time = experiment_start_date, end_time = experiment_end_date, directory = forcing_path, camels_file = f'0{HRU_id}_lump_cida_forcing_leap.txt', alpha = 1.23 + (i/100) ) ) import ewatercycle.forcing ensemble.generate_forcing(ewatercycle.forcing.sources.HBVForcing, forcing_args )
This returns 10 different forcing objects in a list which can be passed to ensemble.initialize
- check_forcing_input(forcing_class, list_forcing_args) None
Check user input for forcing is correct
- static generate_forcing_run(forcing, forcing_args)
Generate forcing given obj & args (with dask).
- initialize(model_name, forcing, setup_kwargs, custom_cfg_dir=True) None
Takes empty Ensemble members and launches the model for given ensemble member
- Parameters:
model_name (str | list) – just takes the modl string name for now, change to more formal config file later? Should you pass a list here, you also need a list of forcing objects & vice versa.
forcing (
ewatercycle.base.forcing.DefaultForcing|list) – object or list of objects to give to the model. Should you want to vary the forcing, you also need to pass a list of models.setup_kwargs (
dict|list) – kwargs dictionary which can be passed as model.setup(**setup_kwargs). UserWarning: Ensure your model saves all kwargs to the config Should you want to vary initial parameters, again all should be a listcustom_cfg_dir (Bool)
Note
If you want to pass a list for any one variable, all others should be lists too of the same length.
- static gather(*args)
- static initialize_run(ensemble, i, custom_cfg_dir)
- initialize_da_method(ensemble_method_name: str, hyper_parameters: dict, state_vector_variables: str | list, observation_path: pathlib.Path | None = None, observed_variable_name: str | None = None, measurement_operator: Any | list | None = None)
Similar to initialize but specifically for the data assimilation method
- Parameters:
ensemble_method_name (str) – name of the data assimilation method for the ensemble
hyper_parameters (dict) – dictionary containing hyperparameters for the method, these will vary per method and thus are merely passed on
state_vector_variables (Optional[str |
list[str]]) –can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:
ewatercycle-HBV
…
Can be a set by passing a list containing strings of variable to include in the state vector.
Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.
For example giving half the particle filters more variables which vary than others - see what that does.
observation_path (Path) –
Path to a NetCDF file containing observations. Ensure the time dimension is of type
numpy.datetime64[ns]in order to work well with- `Ensemble.update`
observed_variable_name (str) – Name of the observed value: often Q but could be anything
measurement_operator (
function`| :obj:`list[functions]) – if not specified: by default ‘all’ known parameters, can be a subset of all by passing a list containing strings of variable to include in the state vector. Should you want to vary initial parameters, again all should be a list
Note
The following three are Keyword Args to make the code more flexible: when running the initialize_da_method to set up the ensemble normally these are all needed. They are separate as these aren’t needed if DA is done on the fly.
Keyword Args:
Note
Assumed memory is large enough to hold observations in memory/lazy open with xarray Assumed memory is large enough to hold observations in memory/lazy open with xarray
- set_state_vector_variables(state_vector_variables: str | list)
Sets state vector variables
Called by Ensemble.initialize_da_method, but can also be called by user to set up get & set state vector if DA is not used.
- Parameters:
state_vector_variables (Optional[str |
list[str]]) –can be set to ‘all’ for known parameters, this is highly model and scenario specific
& should be implemented separately. Currently known to work for:
ewatercycle-HBV
…
Can be a set by passing a list containing strings of variable to include in the state vector.
Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.
For example giving half the particle filters more variables which vary than others - see what that does.
- static initialize_da_method_run(ensemble, state_vector_variables, i)
- finalize(remove_config=True) None
Runs finalize step for all members
- Optional Arg:
- remove_config (Bool) = True: removes created config paths on finalizing by default.
set to false in case you wish to keep.
- static finalize_run(ensemble, i, remove_config)
- update(assimilate=False) None
Updates model for all members. :param assimilate: Whether to assimilate in a given timestep. True by default. :type assimilate: bool
- Algorithm flow:
Gets the state vector, modeled outcome and corresponding observation
Computes new state vector using supplied method
Then set the new state vector
Currently assumed 1D: only one observation per timestep converted to float
Todo: think about assimilation windows not being every timestep
- static update_run(ensemble, i)
- assimilate(ensemble_method_name: str, obs: numpy.ndarray, measurement_operator, hyper_parameters: dict, state_vector_variables: str | list | None)
Similar to calling .. py:function:: Ensemble.update(assimilate=True) Intended for advanced users! The assimilate class aims to make on the fly data assimilation possible. You only need to define which method, observations and H operator you wish to use. This however requires more know-how of the situation,
- Parameters:
ensemble_method_name (str) – name of the data assimilation method for the ensemble
obs (np.ndarray) – array of observations for given timestep.
measurement_operator (
function`| :obj:`list[functions]) – mapshyper_parameters (dict) – dictionary of hyperparameters to set to DA method
state_vector_variables (str | :obj:`list[str]`| None) –
can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:
ewatercycle-HBV
…
Can be a set by passing a list containing strings of variable to include in the state vector.
Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters.
For example giving half the particle filters more variables which vary than others - see what that does.
set to None is called from .. py:function: Ensemble.update(assimilate=true)
- get_predicted_values(measurement_operator) numpy.ndarray
“Loops over the state vectors and applies specified measurement operator to obtain predicted value
- remove_negative()
if only one model is loaded & hydrological: sets negative numbers to positive Other models such as the lorenz model can be negative
- config_specific_actions(pre_set_state)
Function for actions which are specific to a combination of model with method.
Note
Be specific when these are used to only occur when wanted
- #1: PF & HBV:
Particle filters replace the full particle: thus the lag function also needs to be copied.
If only HBV models are implemented with PF this will be updates
if HBV and other models are implemented, this will present a RuntimeWarning.
If other models are implemented with PF, nothing should happen, just a UserWarning so that you’re aware.
- get_value(var_name: str) numpy.ndarray
Gets current value of whole ensemble for given variable ### currently assumes 2d, fix for 1d:
- static get_value_run(ensemble, var_name, i)
- get_state_vector() numpy.ndarray
Gets current value of whole ensemble for specified state vector .. note:: Assumes 1d array? although
np.vstackdoes work for 2d arrays
- static get_state_vector_run(ensemble, i)
- set_value(var_name: str, src: numpy.ndarray) None
Sets current value of whole ensemble for given variable :param src: size = number of ensemble members x 1 [N x 1] :type src: np.ndarray
- static set_value_run(ensemble, var_name, src_i, i)
- set_state_vector(src: numpy.ndarray) None
Sets current value of whole ensemble for specified state vector
- Parameters:
src (np.ndarray) – size = number of ensemble members x number of states in state vector [N x len(z)] src[0] should return the state vector for the first value
- static set_state_vector_run(ensemble, src_i, i)
- static load_netcdf(observation_path: pathlib.Path, observed_variable_name: str) xarray.DataArray
Load the observation data file supplied by user
- class ewatercycle_DA.DA.EnsembleMember
Bases:
pydantic.BaseModelClass containing ensemble members, meant to be called by the DA.Ensemble class
- Parameters:
model_name (str | list[str]) – just takes the modl string name for now, change to more formal config file later? Should you pass a list here, you also need a list of forcing objects & vice versa.
forcing (
ewatercycle.base.forcing.DefaultForcing|list) – object or list of objects to give to the model. Should you want to vary the forcing, you also need to pass a list of models.setup_kwargs (
dict|list[dict]) – kwargs dictionary which can be passed as model.setup(**setup_kwargs). UserWarning: Ensure your model saves all kwargs to the config Should you want to vary initial parameters, again all should be a liststate_vector_variables (Optional[str |
list[str]]) –can be set to ‘all’ for known parameters, this is highly model and scenario specific & should be implemented separately. Currently known to work for:
ewatercycle-HBV
…
Can be a set by passing a list containing strings of variable to include in the state vector. Changing to a subset allows you to do interesting things with ensembles: mainly limited to particle filters. For example giving half the particle filters more variables which vary than others - see what that does. TODO: refactor to be more on the fly
- model
instance of eWaterCycle model to be used. Must be defined in
loaded_modelsdictionary in this file which is a safeguard against misuse.- Type:
ewatercycle.base.model
- config
path to config file for the model which the EnsembleMember contains.
- Type:
Path
- state_vector
numpy array containing last states which were gotten
- Type:
np.ndarray
- variable_names
list of string containing the variables in the state vector.
- Type:
list[str]
- loaded_models
dictionary containing model names and their corresponding instances
- Type:
dict[str, Any]
- model_name: str | None
- forcing: ewatercycle.base.forcing.DefaultForcing | None
- setup_kwargs: dict | None
- state_vector_variables: str | list | None
- model: Any | None
- config: str | None
- cfg_dir: pathlib.Path | None
- state_vector: Any | None
- variable_names: list[str] | None
- loaded_models: dict[str, Any]
- setup(cfg_dir) None
Setups the model provided with forcing and kwargs. Set the config file
- initialize() None
Initializes the model with the config file generated in setup
- set_state_vector_variable()
“Set the list of variables required to obtain the state vector
- get_value(var_name: str) numpy.ndarray
gets current value of an ensemble member
- get_state_vector() numpy.ndarray
Gets current state vector of ensemble member Note: assumed a 1D grid currently as
state_vectoris 1D array. Now check the shape of data and variables.
- set_value(var_name: str, src: numpy.ndarray) None
Sets current value of an ensemble member
- set_state_vector(src: numpy.ndarray) None
Sets current state vector of ensemble member TODO: check this Note: assumes a 1D grid currently as
state_vectoris 1D array.
- finalize(remove_config) None
“Finalizes the model: closing containers etc. if necessary
- update() None
Updates the model to the next timestep
- verify_model_loaded() None
Checks whether specified model is available.
- ewatercycle_DA.DA.validate_method(method)
“Checks uses supplied method to ensure
- ewatercycle_DA.DA.validity_initialize_input(model_name, forcing, setup_kwargs) None
Checks user input to avoid confusion: if model_name is a list, all others must be too.