Source code for metsim.metsim

"""
The main object of the MetSim package. The MetSim object
is used to set up and launch forcing generation and/or
disaggregation routines.

The MetSim object uses a class dictionary to refer to
the model setup, which can be modified after instantiation
if necessary.  Before calling `run` or `launch` on the
instance it is required to call the `load` function to
ensure that all of the required parameters have been
set and that the input data is sufficient to provide the
output specified.
"""
# Meteorology Simulator
# Copyright (C) 2017  The Computational Hydrology Group, Department of Civil
# and Environmental Engineering, University of Washington.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import json
import logging
import itertools
import time as tm
from getpass import getuser
from multiprocessing import Pool

import numpy as np
import pandas as pd
import xarray as xr
from collections import OrderedDict, Iterable

from metsim import io
from metsim.methods import mtclim
from metsim.disaggregate import disaggregate
from metsim.physics import solar_geom
import metsim.constants as cnst
from metsim.datetime import date_range

references = '''Thornton, P.E., and S.W. Running, 1999. An improved algorithm for estimating incident daily solar radiation from measurements of temperature, humidity, and precipitation. Agricultural and Forest Meteorology, 93:211-228.
Kimball, J.S., S.W. Running, and R. Nemani, 1997. An improved method for estimating surface humidity from daily minimum temperature. Agricultural and Forest Meteorology, 85:87-98.
Bohn, T. J., B. Livneh, J. W. Oyler, S. W. Running, B. Nijssen, and D. P. Lettenmaier, 2013a: Global evaluation of MT-CLIM and related algorithms for forcing of ecological and hydrological models, Agr. Forest. Meteorol., 176, 38-49, doi:10.1016/j.agrformet.2013.03.003.'''

now = tm.ctime(tm.time())
user = getuser()
formatter = logging.Formatter(' - '.join(
    ["%(asctime)s", "%(name)s", "%(levelname)s", "%(message)s"]))

ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger = logging.getLogger("metsim")

attrs = {'pet': {'units': 'mm timestep-1', 'long_name': 'potential evaporation',
                 'standard_name': 'water_potential_evaporation_flux'},
         'prec': {'units': 'mm timestep-1', 'long_name': 'precipitation',
                  'standard_name': 'precipitation_flux'},
         'shortwave': {'units': 'W m-2', 'long_name': 'shortwave radiation',
                       'standard_name': 'surface_downwelling_shortwave_flux'},
         'longwave': {'units': 'W m-2', 'long_name': 'longwave radiation',
                      'standard_name': 'surface_downwelling_longwave_flux'},
         't_max': {'units': 'C', 'long_name': 'maximum daily air temperature',
                   'standard_name': 'daily_maximum_air_temperature'},
         't_min': {'units': 'C', 'long_name': 'minimum daily air temperature',
                   'standard_name': 'daily_minimum_air_temperature'},
         'temp': {'units': 'C', 'long_name': 'air temperature',
                  'standard_name': 'air_temperature'},
         'vapor_pressure': {'units': 'kPa', 'long_name': 'vapor pressure',
                            'standard_name': 'vapor_pressure'},
         'air_pressure': {'units': 'kPa', 'long_name': 'air pressure',
                          'standard_name': 'air_pressure'},
         'tskc': {'units': 'fraction', 'long_name': 'cloud fraction',
                  'standard_name': 'cloud_fraction'},
         'rel_humid': {'units': '%', 'long_name': 'relative humidity',
                       'standard_name': 'relative_humidity'},
         'spec_humid': {'units': '', 'long_name': 'specific humidity',
                        'standard_name': 'specific_humidity'},
         'time': {'long_name': 'local time at grid location',
                  'standard_name': 'local_time'},
         '_global': {'conventions': '1.6', 'title': 'Output from MetSim',
                     'institution': 'University of Washington',
                     'source': 'metsim.py',
                     'history': 'Created: {0} by {1}'.format(now, user),
                     'references': references,
                     'documentation': 'Times given are local to their locations.',
                     'comment': 'no comment at this time'}}

attrs = {k: OrderedDict(v) for k, v in attrs.items()}


[docs]class MetSim(object): """ MetSim handles the distribution of jobs that write to a common file by launching muliple processes and queueing up their writeback so that work can be done while IO is happening. """ # Class variables methods = {'mtclim': mtclim} params = { "method": '', "domain": '', "state": '', "out_dir": '', "out_state": '', "out_prefix": 'forcing', "start": '', "stop": '', "time_step": '', "calendar": 'standard', "out_fmt": '', "out_precision": 'f8', "verbose": 0, "sw_prec_thresh": 0.0, "mtclim_swe_corr": False, "annual": False, "lw_cloud": 'cloud_deardorff', "lw_type": 'prata', "tdew_tol": 1e-6, "tmax_daylength_fraction": 0.67, "snow_crit_temp": -6.0, "snow_melt_rate": 0.042, "rain_scalar": 0.75, "tday_coef": 0.45, "lapse_rate": 0.0065, "iter_dims": ['lat', 'lon'], "out_vars": ['temp', 'prec', 'shortwave', 'longwave', 'vapor_pressure', 'rel_humid'] } def __init__(self, params: dict): """ Constructor """ # Record parameters self.params.update(params) logger.setLevel(self.params['verbose']) ch.setLevel(self.params['verbose']) logger.addHandler(ch) self.domain = io.read_domain(self.params) self.met_data = io.read_met_data(self.params, self.domain) self.state = io.read_state(self.params, self.domain) self.met_data['elev'] = self.domain['elev'] self.met_data['lat'] = self.domain['lat'] self._aggregate_state()
[docs] def launch(self): """Farm out the jobs to separate processes""" # Do the forcing generation and disaggregation if required self._validate_setup() nprocs = self.params['nprocs'] time_dim = pd.DatetimeIndex(self.met_data.time.values) iter_list = [self.met_data[dim].values for dim in self.params['iter_dims']] self.disagg = int(self.params['time_step']) < cnst.MIN_PER_DAY self.method = MetSim.methods[self.params['method']] if self.params['annual']: groups = time_dim.groupby(time_dim.year) else: groups = {'total': time_dim} for label, times in groups.items(): self.pool = Pool(processes=nprocs) self.site_generator = itertools.product(*iter_list) logger.info("Beginning {}".format(label)) status = [] # Add in some end point data for continuity in chunking times_ext = times.union([times[0] - pd.Timedelta("1 days"), times[-1] + pd.Timedelta("1 days")] ).intersection(time_dim) data = self.met_data.sel(time=times_ext) self.setup_output(self.met_data.sel(time=times)) for site in self.site_generator: locs = {k: v for k, v in zip(self.params['iter_dims'], site)} # Don't run masked cells if not self.domain['mask'].sel(**locs).values > 0: continue stat = self.pool.apply_async( wrap_run, args=(self.method.run, locs, self.params, data.sel(**locs), self.state.sel(**locs), self.disagg, times, label), callback=self._unpack_results) status.append(stat) self.pool.close() # Check that everything worked [stat.get() for stat in status] self.pool.join() self.write(label)
[docs] def run(self): """ Kicks off the disaggregation and queues up data for IO """ self._validate_setup() time_dim = pd.DatetimeIndex(self.met_data.time.values) iter_list = [self.met_data[dim].values for dim in self.params['iter_dims']] self.disagg = int(self.params['time_step']) < cnst.MIN_PER_DAY self.method = MetSim.methods[self.params['method']] if self.params['annual']: groups = time_dim.groupby(time_dim.year) else: groups = {'total': time_dim} for label, times in groups.items(): logger.info("Beginning {}".format(label)) self.site_generator = itertools.product(*iter_list) # Add in some end point data for continuity times_ext = times.union([times[0] - pd.Timedelta("1 days"), times[-1] + pd.Timedelta("1 days")] ).intersection(time_dim) data = self.met_data.sel(time=times_ext) self.setup_output(self.met_data.sel(time=times)) for site in self.site_generator: locs = {k: v for k, v in zip(self.params['iter_dims'], site)} # Don't run masked cells if not self.domain['mask'].sel(**locs).values > 0: continue wrap_results = wrap_run( self.method.run, locs, self.params, data.sel(**locs), self.state.sel(**locs), self.disagg, times, label) # Cut the returned data down to the correct time index self._unpack_results(wrap_results) self.write(label)
def _unpack_results(self, result: tuple): """Put results into the master dataset""" if len(result) == 3: locs, df, state = result self._unpack_state(state, locs) else: locs, df = result for varname in self.params['out_vars']: try: self.output[varname].loc[locs] = df[varname] except ValueError as e: logger.error(e) logger.error("This error is probably indicitive of a mismatch " "between the domain and input data. Check that " "all of your cells inside of the mask have both " "elevation in the domain as well as all of the " "required input forcings.") raise def _unpack_state(self, result: pd.DataFrame, locs: dict): """Put restart values in the state dataset""" # We concatenate with the old state values in case we don't # have 90 new days to use tmin = np.concatenate((self.state['t_min'].sel(**locs).values[:], result['t_min'].values)) tmax = np.concatenate((self.state['t_max'].sel(**locs).values[:], result['t_max'].values)) prec = np.concatenate((self.state['prec'].sel(**locs).values[:], result['prec'].values)) self.state['t_min'].sel(**locs).values[:] = tmin[-90:] self.state['t_max'].sel(**locs).values[:] = tmax[-90:] self.state['prec'].sel(**locs).values[:] = prec[-90:] self.state['swe'].sel(**locs).values = result['swe'].values[-1] state_start = result.index[-1] - pd.Timedelta('89 days') self.state.time.values = date_range(state_start, result.index[-1], calendar=self.params['calendar']) def setup_output(self, prototype: xr.Dataset=None): if not prototype: prototype = self.met_data self.disagg = int(self.params['time_step']) < cnst.MIN_PER_DAY # Number of timesteps if self.disagg: delta = pd.Timedelta('1 days') - pd.Timedelta( '{} minutes'.format(self.params['time_step'])) else: delta = pd.Timedelta('0 days') start = pd.Timestamp(prototype.time.values[0]).to_pydatetime() stop = pd.Timestamp(prototype.time.values[-1]).to_pydatetime() times = date_range(start, stop + delta, freq="{}T".format(self.params['time_step']), calendar=self.params['calendar']) n_ts = len(times) shape = (n_ts, ) + self.domain['mask'].shape dims = ('time', ) + self.domain['mask'].dims coords = {'time': times, **self.domain.mask.coords} self.output = xr.Dataset(coords=coords) if 'elev' in self.params: self.params.pop('elev') for k, v in self.params.items(): # Need to convert some parameters to strings if k in ['start', 'stop', 'annual', 'mtclim_swe_corr']: v = str(v) # Don't include complex types if isinstance(v, dict): v = json.dumps(v) elif not isinstance(v, str) and isinstance(v, Iterable): v = ', '.join(v) attrs['_global'][k] = v self.output.attrs = attrs['_global'] for varname in self.params['out_vars']: self.output[varname] = xr.DataArray( data=np.full(shape, np.nan), coords=coords, dims=dims, name=varname, attrs=attrs.get(varname, {}), encoding={'dtype': self.params['out_precision'], '_FillValue': cnst.FILL_VALUES['f8']}) self.output.time.attrs.update(attrs['time']) def _aggregate_state(self): """Aggregate data out of the state file and load it into `met_data`""" # Precipitation record begin_record = self.params['start'] - pd.Timedelta("90 days") end_record = self.params['start'] - pd.Timedelta("1 days") record_dates = date_range(begin_record, end_record, calendar=self.params['calendar']) trailing = self.state['prec'].sel(time=record_dates) total_precip = xr.concat([trailing, self.met_data['prec']], dim='time') total_precip = (cnst.DAYS_PER_YEAR * total_precip.rolling( time=90).mean().drop(record_dates, dim='time')) self.met_data['seasonal_prec'] = total_precip # Smoothed daily temperature range trailing = self.state['t_max'] - self.state['t_min'] begin_record = self.params['start'] - pd.Timedelta("90 days") end_record = self.params['start'] - pd.Timedelta("1 days") record_dates = date_range(begin_record, end_record, calendar=self.params['calendar']) trailing['time'] = record_dates dtr = self.met_data['t_max'] - self.met_data['t_min'] sm_dtr = xr.concat([trailing, dtr], dim='time') sm_dtr = sm_dtr.rolling(time=30).mean().drop(record_dates, dim='time') self.met_data['smoothed_dtr'] = sm_dtr # Put in SWE data self.state['swe'] = self.state.sel(time=end_record).swe.drop('time') self.met_data['swe'] = self.state.swe.copy() def _validate_setup(self): """Updates the global parameters dictionary""" errs = [""] # Make sure there's some input if not len(self.params['forcing']): errs.append("Requires input forcings to be specified") # Parameters that can't be empty strings or None non_empty = ['method', 'out_dir', 'out_state', 'start', 'stop', 'time_step', 'out_fmt', 'forcing_fmt', 'domain_fmt', 'state_fmt'] for each in non_empty: if self.params[each] is None or self.params[each] == '': errs.append("Cannot have empty value for {}".format(each)) # Make sure time step divides evenly into a day if cnst.MIN_PER_DAY % int(self.params['time_step']): errs.append("Time step must divide 1440 evenly. Got {}" .format(self.params['time_step'])) # Check for required input variable specification required_in = ['t_min', 't_max', 'prec'] for each in required_in: if each not in self.met_data.variables: errs.append("Input requires {}".format(each)) # Make sure that we are going to write out some data if not len(self.params['out_vars']): errs.append("Output variable list must not be empty") # Check that the parameters specified are available opts = {'mtclim_swe_corr': [True, False], 'out_precision': ['f4', 'f8'], 'lw_cloud': ['default', 'cloud_deardorff'], 'lw_type': ['default', 'tva', 'anderson', 'brutsaert', 'satterlund', 'idso', 'prata']} for k, v in opts.items(): if not self.params[k] in v: errs.append("Invalid option given for {}".format(k)) # If any errors, raise and give a summary if len(errs) > 1: raise Exception("\n ".join(errs))
[docs] def write(self, suffix=""): """ Dispatch to the right function based on the configuration given """ dispatch = { 'netcdf': self.write_netcdf, 'ascii': self.write_ascii, 'data': self.write_data } dispatch[self.params.get('out_fmt', 'netcdf').lower()](suffix)
[docs] def write_netcdf(self, suffix: str): """Write out as NetCDF to the output file""" logger.info("Writing netcdf...") for dirname in [self.params['out_dir'], os.path.dirname(self.params['out_state'])]: os.makedirs(dirname, exist_ok=True) # all state variables are written as doubles state_encoding = {'time': {'dtype': 'f8'}} for v in self.state: state_encoding[v] = {'dtype': 'f8'} # write state file self.state.to_netcdf(self.params['out_state'], encoding=state_encoding) # write output file fname = '{}_{}.nc'.format(self.params['out_prefix'], suffix) output_filename = os.path.join(self.params['out_dir'], fname) self.output.to_netcdf(output_filename, unlimited_dims=['time'], encoding={'time': {'dtype': 'f8'}})
[docs] def write_ascii(self, suffix): """Write out as ASCII to the output file""" logger.info("Writing ascii...") for dirname in [self.params['out_dir'], os.path.dirname(self.params['out_state'])]: os.makedirs(dirname, exist_ok=True) # all state variables are written as doubles state_encoding = {'time': {'dtype': 'f8'}} for v in self.state: state_encoding[v] = {'dtype': 'f8'} # write state file self.state.to_netcdf(self.params['out_state'], encoding=state_encoding) # Need to create new generator to loop over iter_list = [self.met_data[dim].values for dim in self.params['iter_dims']] site_generator = itertools.product(*iter_list) for site in site_generator: locs = {k: v for k, v in zip(self.params['iter_dims'], site)} if not self.domain['mask'].sel(**locs).values > 0: continue fname = ("{}_" * (len(iter_list)+1) + "{}.csv").format( self.params['out_prefix'], *site, suffix) fpath = os.path.join(self.params['out_dir'], fname) self.output.sel(**locs)[self.params[ 'out_vars']].to_dataframe().to_csv(fpath)
def write_data(self, suffix): pass
[docs]def wrap_run(func: callable, loc: dict, params: dict, ds: xr.Dataset, state: xr.Dataset, disagg: bool, out_times: pd.DatetimeIndex, year: str): """ Iterate over a chunk of the domain. This is wrapped so we can return a tuple of locs and df. Parameters ---------- func: callable The function to call to do the work loc: dict Some subset of the domain to do work on params: dict Parameters from a MetSim object ds: xr.Dataset Input forcings and domain state: xr.Dataset State variables at the point of interest disagg: bool Whether or not we should run a disagg routine out_times: pd.DatetimeIndex Times to return (should be trimmed 1 day at each end from the given index) year: str The year being run. This is used to add on extra times to make output smooth at endpoints if the run is chunked in time. Returns ------- results A list of tuples arranged as (location, hourly_output, daily_output) """ logger.info("Processing {}".format(loc)) lat = ds['lat'].values elev = ds['elev'].values swe = ds['swe'].values params['elev'] = elev df = ds.to_dataframe() # solar_geom returns a tuple due to restrictions of numba # for clarity we convert it to a dictionary here sg = solar_geom(elev, lat, params['lapse_rate']) sg = {'tiny_rad_fract': sg[0], 'daylength': sg[1], 'potrad': sg[2], 'tt_max0': sg[3]} # Generate the daily values - these are saved # so that we can use a subset of them to write # out the state file later df_base = func(df, params, sg, elev=elev, swe=swe) if disagg: # Get some values for padding the time list, # so that when interpolating in the disaggregation # functions we can match endpoints with adjoining # chunks - if no data is available, just repeat some # default values (this case is used at the very # beginning and end of the record) try: prevday = out_times[0] - pd.Timedelta('1 days') t_begin = [ds['t_min'].sel(time=prevday), ds['t_max'].sel(time=prevday)] except (KeyError, ValueError): t_begin = [state['t_min'].values[-1], state['t_max'].values[-1]] try: nextday = pd.datetime(int(year)+1, 1, 1) t_end = [ds['t_min'].sel(time=nextday), ds['t_max'].sel(time=nextday)] except (KeyError, ValueError): # None so that we don't extend the record t_end = None # Disaggregate to subdaily values df_complete = disaggregate(df, params, sg, t_begin, t_end) # Calculate the times that we want to get out by chopping # off the endpoints that were added on previously start = out_times[0] stop = (out_times[-1] + pd.Timedelta('1 days') - pd.Timedelta("{} minutes".format(params['time_step']))) new_times = date_range( start, stop, freq='{}T'.format(params['time_step']), calendar=params['calendar']) else: # convert srad to daily average flux from daytime flux df_base['shortwave'] *= df_base['dayl'] / cnst.SEC_PER_DAY # If we're outputting daily values, we dont' need to # change the output dates - see inside of `if` condition # above for more explanation new_times = out_times df_complete = df_base # Cut the returned data down to the correct time index df_complete = df_complete.loc[new_times[0]:new_times[-1]] return (loc, df_complete, df_base)