"""
The main object of the MetSim package. The MetSim object
is used to set up and launch forcing generation and/or
disaggregation routines.
The MetSim object uses a class dictionary to refer to
the model setup, which can be modified after instantiation
if necessary. Before calling `run` or `launch` on the
instance it is required to call the `load` function to
ensure that all of the required parameters have been
set and that the input data is sufficient to provide the
output specified.
"""
# Meteorology Simulator
# Copyright (C) 2017 The Computational Hydrology Group, Department of Civil
# and Environmental Engineering, University of Washington.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import itertools
import json
import logging
import os
import sys
import time as tm
from collections import Iterable, OrderedDict
from getpass import getuser
import numpy as np
import pandas as pd
import xarray as xr
import dask
from dask.diagnostics import ProgressBar
from netCDF4 import Dataset
from cftime import date2num
from xarray.backends.locks import get_write_lock, combine_locks, NETCDFC_LOCK
import metsim.constants as cnst
from metsim import io
from metsim.datetime import date_range
from metsim.disaggregate import disaggregate
from metsim.methods import mtclim
from metsim.physics import solar_geom
NO_SLICE = {}
DASK_CORE_SCHEDULERS = ['multiprocessing', 'threading', 'synchronous',
'processes', 'threads', 'single-threaded', 'sync']
references = '''Thornton, P.E., and S.W. Running, 1999. An improved algorithm for estimating incident daily solar radiation from measurements of temperature, humidity, and precipitation. Agricultural and Forest Meteorology, 93:211-228.
Kimball, J.S., S.W. Running, and R. Nemani, 1997. An improved method for estimating surface humidity from daily minimum temperature. Agricultural and Forest Meteorology, 85:87-98.
Bohn, T. J., B. Livneh, J. W. Oyler, S. W. Running, B. Nijssen, and D. P. Lettenmaier, 2013a: Global evaluation of MT-CLIM and related algorithms for forcing of ecological and hydrological models, Agr. Forest. Meteorol., 176, 38-49, doi:10.1016/j.agrformet.2013.03.003.'''
DEFAULT_TIME_UNITS = 'minutes since 2000-01-01 00:00:00.0'
now = tm.ctime(tm.time())
user = getuser()
attrs = {'pet': {'units': 'mm timestep-1',
'long_name': 'potential evaporation',
'standard_name': 'water_potential_evaporation_flux',
'missing_value': np.nan, 'fill_value': np.nan},
'prec': {'units': 'mm timestep-1', 'long_name': 'precipitation',
'standard_name': 'precipitation_flux',
'missing_value': np.nan, 'fill_value': np.nan},
'shortwave': {'units': 'W m-2', 'long_name': 'shortwave radiation',
'standard_name': 'surface_downwelling_shortwave_flux',
'missing_value': np.nan, 'fill_value': np.nan},
'longwave': {'units': 'W m-2', 'long_name': 'longwave radiation',
'standard_name': 'surface_downwelling_longwave_flux',
'missing_value': np.nan, 'fill_value': np.nan},
't_max': {'units': 'C', 'long_name': 'maximum daily air temperature',
'standard_name': 'daily_maximum_air_temperature',
'missing_value': np.nan, 'fill_value': np.nan},
't_min': {'units': 'C', 'long_name': 'minimum daily air temperature',
'standard_name': 'daily_minimum_air_temperature',
'missing_value': np.nan, 'fill_value': np.nan},
'temp': {'units': 'C', 'long_name': 'air temperature',
'standard_name': 'air_temperature',
'missing_value': np.nan, 'fill_value': np.nan},
'vapor_pressure': {'units': 'kPa', 'long_name': 'vapor pressure',
'standard_name': 'vapor_pressure',
'missing_value': np.nan, 'fill_value': np.nan},
'air_pressure': {'units': 'kPa', 'long_name': 'air pressure',
'standard_name': 'air_pressure',
'missing_value': np.nan, 'fill_value': np.nan},
'tskc': {'units': 'fraction', 'long_name': 'cloud fraction',
'standard_name': 'cloud_fraction',
'missing_value': np.nan, 'fill_value': np.nan},
'rel_humid': {'units': '%', 'long_name': 'relative humidity',
'standard_name': 'relative_humidity',
'missing_value': np.nan, 'fill_value': np.nan},
'spec_humid': {'units': '', 'long_name': 'specific humidity',
'standard_name': 'specific_humidity',
'missing_value': np.nan, 'fill_value': np.nan},
'wind': {'units': 'm/s', 'long_name': 'wind speed',
'standard_name': 'wind_speed',
'missing_value': np.nan, 'fill_value': np.nan},
'_global': {'conventions': '1.6', 'title': 'Output from MetSim',
'institution': 'University of Washington',
'source': 'metsim.py',
'history': 'Created: {0} by {1}'.format(now, user),
'references': references,
'comment': 'no comment at this time'}}
attrs = {k: OrderedDict(v) for k, v in attrs.items()}
[docs]class MetSim(object):
"""
MetSim handles the distribution of jobs that write to a common file
by launching muliple processes and queueing up their writeback so that
work can be done while IO is happening.
"""
# Class variables
methods = {'mtclim': mtclim}
params = {
"period_ending": False,
"is_worker": False,
"method": 'mtclim',
"domain": '',
"state": '',
"out_dir": '',
"out_prefix": 'forcing',
"start": 'forcing',
"stop": 'forcing',
"time_step": -1,
"calendar": 'standard',
"prec_type": 'uniform',
"out_precision": 'f4',
"verbose": 0,
"sw_prec_thresh": 0.0,
"utc_offset": False,
"lw_cloud": 'cloud_deardorff',
"lw_type": 'prata',
"tdew_tol": 1e-6,
"tmax_daylength_fraction": 0.67,
"rain_scalar": 0.75,
"tday_coef": 0.45,
"lapse_rate": 0.0065,
"out_vars": ['temp', 'prec', 'shortwave', 'longwave',
'vapor_pressure', 'rel_humid'],
"out_freq": None,
"chunks": NO_SLICE,
"scheduler": 'distributed',
"num_workers": 1,
}
def __init__(self, params: dict, domain_slice=NO_SLICE):
"""
Constructor
"""
self._domain = None
self._met_data = None
self._state = None
self._client = None
self._domain_slice = domain_slice
self.progress_bar = ProgressBar()
self.params.update(params)
logging.captureWarnings(True)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(self.params['verbose'])
formatter = logging.Formatter(' - '.join(
['%asctime)s', '%(name)s', '%(levelname)s', '%(message)s']))
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
ch.setLevel(self.params['verbose'])
# set global dask scheduler
if domain_slice is NO_SLICE:
if self.params['scheduler'] in DASK_CORE_SCHEDULERS:
dask.config.set(scheduler=self.params['scheduler'])
else:
from distributed import Client, progress
if 'distributed' == self.params['scheduler']:
self._client = Client(
n_workers=self.params['num_workers'],
threads_per_worker=1)
if self.params['verbose'] == logging.DEBUG:
self.progress_bar = progress
elif os.path.isfile(self.params['scheduler']):
self._client = Client(
scheduler_file=self.params['scheduler'])
else:
self._client = Client(self.params['scheduler'])
else:
dask.config.set(scheduler=self.params['scheduler'])
# Set up logging
# If in verbose mode set up the progress bar
if self.params['verbose'] == logging.DEBUG:
if 'distributed' != self.params['scheduler']:
self.progress_bar.register()
self.progress_bar = lambda x: x
else:
# If not in verbose mode, create a dummy function
self.progress_bar = lambda x: x
# Create time vector(s)
self._times = self._get_output_times(
freq=self.params['out_freq'],
period_ending=self.params['period_ending'])
def _validate_force_times(self, force_times):
for p, i in [('start', 0), ('stop', -1)]:
# infer times from force_times
if isinstance(self.params[p], str):
if self.params[p] == 'forcing':
self.params[p] = pd.Timestamp(
force_times.values[i]).to_pydatetime()
elif '/' in self.params[p]:
year, month, day = map(int, self.params[p].split('/'))
self.params[p] = pd.datetime(year, month, day)
else:
self.params[p] = pd.to_datetime(self.params[p])
# update calendar from input data (fall back to params version)
self.params['calendar'] = self.met_data['time'].encoding.get(
'calendar', self.params['calendar'])
assert self.params['start'] >= pd.Timestamp(
force_times.values[0]).to_pydatetime()
assert self.params['stop'] <= pd.Timestamp(
force_times.values[-1]).to_pydatetime()
self.params['state_start'] = (self.params['start'] -
pd.Timedelta("90 days"))
self.params['state_stop'] = (self.params['start'] -
pd.Timedelta("1 days"))
if self.params['utc_offset']:
attrs['time'] = {'units': DEFAULT_TIME_UNITS,
'long_name': 'UTC time',
'standard_name': 'utc_time'}
else:
attrs['time'] = {'units': DEFAULT_TIME_UNITS,
'long_name': 'local time at grid location',
'standard_name': 'local_time'}
@property
def domain(self):
if self._domain is None:
self._domain = io.read_domain(self.params).isel(
**self._domain_slice)
return self._domain
@property
def met_data(self):
if self._met_data is None:
self._met_data = io.read_met_data(self.params, self.domain)
self._met_data['elev'] = self.domain['elev']
self._met_data['lat'] = self.domain['lat']
self._met_data['lon'] = self.domain['lon']
# process constant_vars
constant_vars = self.params.get('constant_vars', None)
if constant_vars:
da_template = self._met_data[list(self._met_data)[0]]
for var in constant_vars.keys():
self._met_data[var] = xr.full_like(da_template,
float(constant_vars[var]))
self._validate_force_times(force_times=self._met_data['time'])
return self._met_data
@property
def state(self):
if self._state is None:
self._state = io.read_state(self.params, self.domain)
self._aggregate_state()
return self._state
@property
def slices(self):
if not self.params['chunks']:
return [{d: slice(None) for d in self.domain[['mask']].dims}]
return chunk_domain(self.params['chunks'], self.domain[['mask']].dims)
def open_output(self):
filenames = [self._get_output_filename(times) for times in self._times]
return xr.open_mfdataset(filenames)
def run(self):
self._validate_setup()
write_locks = {}
for times in self._times:
filename = self._get_output_filename(times)
self.setup_netcdf_output(filename, times)
write_locks[filename] = combine_locks([NETCDFC_LOCK, get_write_lock(filename)])
self.logger.info('Starting {} chunks...'.format(len(self.slices)))
delayed_objs = [wrap_run_slice(self.params, write_locks, dslice)
for dslice in self.slices]
persisted = dask.persist(delayed_objs, num_workers=self.params['num_workers'])
self.progress_bar(persisted)
dask.compute(persisted)
self.logger.info('Cleaning up...')
try:
self._client.cluster.close()
self._client.close()
if self.params['verbose'] == logging.DEBUG:
print()
print('closed dask cluster/client')
except Exception:
pass
def load_inputs(self, close=True):
self._domain = self.domain.load()
self._met_data = self.met_data.load()
self._state = self.state.load()
if close:
self._domain.close()
self._met_data.close()
self._state.close()
[docs] def setup_netcdf_output(self, filename, times):
'''setup a single netcdf file'''
with Dataset(filename, mode="w") as ncout:
# dims
dim_sizes = (None, ) + self.domain['mask'].shape
var_dims = ('time', ) + self.domain['mask'].dims
chunksizes = [len(times)]
for d, s in zip(var_dims[1:], dim_sizes[1:]):
c = int(self.params['chunks'].get(d, s))
if c <= s:
chunksizes.append(c)
else:
chunksizes.append(s)
create_kwargs = {'chunksizes': chunksizes}
for d, size in zip(var_dims, dim_sizes):
ncout.createDimension(d, size)
# vars
for varname in self.params['out_vars']:
ncout.createVariable(
varname, self.params['out_precision'], var_dims,
**create_kwargs)
# add metadata and coordinate variables (time/lat/lon)
time_var = ncout.createVariable('time', 'i4', ('time', ))
time_var.calendar = self.params['calendar']
time_var[:] = date2num(
times.to_pydatetime(),
units=attrs['time'].get('units', DEFAULT_TIME_UNITS),
calendar=time_var.calendar)
dtype_map = {'float64': 'f8', 'float32': 'f4',
'int64': 'i8', 'int32': 'i4'}
for dim in self.domain['mask'].dims:
dim_vals = self.domain[dim].values
dim_dtype = dtype_map.get(
str(dim_vals.dtype), self.params['out_precision'])
dim_var = ncout.createVariable(dim, dim_dtype, (dim, ))
dim_var[:] = dim_vals
for p in ['elev', 'lat', 'lon', 'is_worker']:
if p in self.params:
self.params.pop(p)
for k, v in self.params.items():
# Need to convert some parameters to strings
if k in ['start', 'stop', 'utc_offset', 'period_ending']:
v = str(v)
elif k in ['state_start', 'state_stop', 'out_freq']:
# skip
continue
# Don't include complex types
if isinstance(v, dict):
v = json.dumps(v)
elif not isinstance(v, str) and isinstance(v, Iterable):
v = ', '.join(v)
if isinstance(v, str):
v = v.replace("'", "").replace('"', "")
attrs['_global'][k] = v
# set global attrs
for key, val in attrs['_global'].items():
setattr(ncout, key, val)
# set variable attrs
for varname in ncout.variables:
for key, val in attrs.get(varname, {}).items():
setattr(ncout.variables[varname], key, val)
[docs] def write_chunk(self, locks=None):
'''write data from a single chunk'''
if not len(self.params['out_vars']):
return
for times in self._times:
filename = self._get_output_filename(times)
lock = locks.get(filename, DummyLock())
time_slice = slice(times[0], times[-1])
with lock:
with Dataset(filename, mode="r+") as ncout:
for varname in self.params['out_vars']:
dims = ncout.variables[varname].dimensions[1:]
write_slice = ((slice(None), ) + tuple(
self._domain_slice[d] for d in dims))
ncout.variables[varname][write_slice] = (
self.output[varname].sel(time=time_slice).values)
[docs] def run_slice(self):
"""
Run a single slice of
"""
self._validate_setup()
self.disagg = int(self.params['time_step']) < cnst.MIN_PER_DAY
self.method = MetSim.methods[self.params['method']]
self.setup_output()
times = self.met_data['time']
params = self.params.copy()
# transform input parameters to floating point values
params['sw_prec_thresh'] = float(params['sw_prec_thresh'])
params['rain_scalar'] = float(params['rain_scalar'])
params['tdew_tol'] = float(params['tdew_tol'])
params['tmax_daylength_fraction'] = float(params['tmax_daylength_fraction'])
params['tday_coef'] = float(params['tday_coef'])
params['tmax_daylength_fraction'] = float(params['tmax_daylength_fraction'])
params['lapse_rate'] = float(params['lapse_rate'])
for index, mask_val in np.ndenumerate(self.domain['mask'].values):
if mask_val > 0:
locs = {d: i for d, i in zip(self.domain['mask'].dims, index)}
if self.params['prec_type'].upper() in ['TRIANGLE', 'MIX']:
# add variables for triangle precipitation disgregation
# method to parameters
params['dur'], params['t_pk'] = (
add_prec_tri_vars(self.domain.isel(**locs)))
else:
continue
df, state = wrap_run_cell(self.method.run, params,
self.met_data.isel(**locs),
self.state.isel(**locs),
self.disagg, times)
# Cut the returned data down to the correct time index
for varname in self.params['out_vars']:
self.output[varname][locs] = df[varname].values
def _unpack_state(self, result: pd.DataFrame, locs: dict):
"""Put restart values in the state dataset"""
# We concatenate with the old state values in case we don't
# have 90 new days to use
tmin = np.concatenate((self.state['t_min'].isel(**locs).values[:],
result['t_min'].values))
tmax = np.concatenate((self.state['t_max'].isel(**locs).values[:],
result['t_max'].values))
prec = np.concatenate((self.state['prec'].isel(**locs).values[:],
result['prec'].values))
self.state['t_min'].isel(**locs).values[:] = tmin[-90:]
self.state['t_max'].isel(**locs).values[:] = tmax[-90:]
self.state['prec'].isel(**locs).values[:] = prec[-90:]
state_start = result.index[-1] - pd.Timedelta('89 days')
self.state['time'].values = date_range(
state_start, result.index[-1], calendar=self.params['calendar'])
def _get_output_times(self, freq=None, period_ending=False):
"""
Generate chunked time vectors
Parameters
----------
freq:
Output frequency. Given as a Pandas timegrouper string.
If not given, the entire timeseries will be used.
period_ending:
Flag to specify if output timesteps should be period-
ending. Default is period-beginning
Returns
-------
times:
A list of timeseries which represent each of times that
output files will be created for.
"""
prototype = self.met_data
self.disagg = int(self.params['time_step']) < cnst.MIN_PER_DAY
if self.disagg:
delta = pd.Timedelta('1 days') - pd.Timedelta(
'{} minutes'.format(self.params['time_step']))
else:
delta = pd.Timedelta('0 days')
if period_ending:
offset = pd.Timedelta(
'{} minutes'.format(self.params['time_step']))
else:
offset = pd.Timedelta('0 minutes')
start = pd.Timestamp(prototype['time'].values[0]).to_pydatetime()
stop = pd.Timestamp(prototype['time'].values[-1]).to_pydatetime()
times = date_range(start + offset, stop + offset + delta,
freq="{}T".format(self.params['time_step']),
calendar=self.params['calendar'])
if freq is None or freq == '':
times = [times]
else:
dummy = pd.Series(np.arange(len(times)), index=times)
grouper = pd.Grouper(freq=freq)
times = [t.index for k, t in dummy.groupby(grouper)]
return times
def _get_output_filename(self, times):
suffix = self.get_nc_output_suffix(times)
fname = '{}_{}.nc'.format(self.params['out_prefix'], suffix)
output_filename = os.path.join(
os.path.abspath(self.params['out_dir']), fname)
return output_filename
def setup_output(self):
# output times
times = self._get_output_times(
freq=None, period_ending=self.params['period_ending'])[0]
# Number of timesteps
n_ts = len(times)
shape = (n_ts, ) + self.domain['mask'].shape
dims = ('time', ) + self.domain['mask'].dims
coords = {'time': times, **self.domain['mask'].coords}
self.output = xr.Dataset(coords=coords)
self.output['time'].encoding['calendar'] = self.params['calendar']
dtype = self.params['out_precision']
for varname in self.params['out_vars']:
self.output[varname] = xr.DataArray(
data=np.full(shape, np.nan, dtype=dtype),
coords=coords, dims=dims,
name=varname, attrs=attrs.get(varname, {}))
self.output['time'].attrs.update(attrs['time'])
def _aggregate_state(self):
"""Aggregate data out of the state file and load it into `met_data`"""
# Precipitation record
assert self.state.dims['time'] == 90, self.state['time']
record_dates = date_range(self.params['state_start'],
self.params['state_stop'],
calendar=self.params['calendar'])
trailing = self.state['prec']
trailing['time'] = record_dates
total_precip = xr.concat([trailing, self.met_data['prec']],
dim='time').load()
total_precip = (cnst.DAYS_PER_YEAR * total_precip.rolling(
time=90).mean().sel(time=slice(self.params['start'],
self.params['stop'])))
self.met_data['seasonal_prec'] = total_precip
# Smoothed daily temperature range
trailing = self.state['t_max'] - self.state['t_min']
trailing['time'] = record_dates
dtr = self.met_data['t_max'] - self.met_data['t_min']
if (dtr < 0).any():
raise ValueError("Daily maximum temperature lower"
" than daily minimum temperature!")
sm_dtr = xr.concat([trailing, dtr], dim='time').load()
sm_dtr = sm_dtr.rolling(time=30).mean().drop(record_dates, dim='time')
self.met_data['dtr'] = dtr
self.met_data['smoothed_dtr'] = sm_dtr
def _validate_setup(self):
"""Updates the global parameters dictionary"""
errs = [""]
# Make sure there's some input
if not len(self.params.get('forcing', [])):
errs.append("Requires input forcings to be specified")
# Make sure there is at least one forcing_var
# They cannot all be constant since we use one as a template
# for the others
if not len(self.params.get('forcing_vars', [])):
errs.append("Requires at least one non-constant forcing")
# Parameters that can't be empty strings or None
non_empty = ['out_dir', 'time_step', 'forcing_fmt']
for each in non_empty:
if self.params.get(each, None) is None or self.params[each] == '':
errs.append("Cannot have empty value for {}".format(each))
# Make sure time step divides evenly into a day
if (cnst.MIN_PER_DAY % int(self.params.get('time_step', -1)) or
(int(self.params['time_step']) > (6 * cnst.MIN_PER_HOUR) and
int(self.params['time_step']) != cnst.MIN_PER_DAY)):
errs.append("Time step must be evenly divisible into 1440 "
"minutes (24 hours) and less than 360 minutes "
"(6 hours). Got {}.".format(self.params['time_step']))
# Check for required input variable specification
if self.met_data is not None:
required_in = ['t_min', 't_max', 'prec']
for each in required_in:
if each not in self.met_data.variables:
errs.append("Input requires {}".format(each))
# Make sure that we are going to write out some data
if not len(self.params.get('out_vars', [])):
errs.append("Output variable list must not be empty")
# Check output variables are valid
daily_out_vars = ['t_min', 't_max', 't_day', 'prec', 'vapor_pressure',
'shortwave', 'tskc', 'pet', 'wind']
out_var_check = ['temp', 'prec', 'shortwave', 'vapor_pressure',
'air_pressure', 'rel_humid', 'spec_humid',
'longwave', 'tskc', 'wind']
if int(self.params.get('time_step', -1)) == 1440:
out_var_check = daily_out_vars
for var in self.params.get('out_vars', []):
if var not in out_var_check:
errs.append('Cannot output variable {} at timestep {}'.format(
var, self.params['time_step']))
# Check that the parameters specified are available
opts = {'out_precision': ['f4', 'f8'],
'lw_cloud': ['default', 'cloud_deardorff'],
'lw_type': ['default', 'tva', 'anderson',
'brutsaert', 'satterlund',
'idso', 'prata']}
for k, v in opts.items():
if not self.params.get(k, None) in v:
errs.append("Invalid option given for {}".format(k))
# If any errors, raise and give a summary
if len(errs) > 1:
raise Exception("\n ".join(errs))
def get_nc_output_suffix(self, times):
s, e = times[[0, -1]]
template = '{:04d}{:02d}{:02d}-{:04d}{:02d}{:02d}'
return template.format(s.year, s.month, s.day,
e.year, e.month, e.day,)
[docs]def wrap_run_cell(func: callable, params: dict,
ds: xr.Dataset, state: xr.Dataset, disagg: bool,
out_times: pd.DatetimeIndex):
"""
Iterate over a chunk of the domain. This is wrapped
so we can return a tuple of locs and df.
Parameters
----------
func: callable
The function to call to do the work
params: dict
Parameters from a MetSim object
ds: xr.Dataset
Input forcings and domain
state: xr.Dataset
State variables at the point of interest
disagg: bool
Whether or not we should run a disagg routine
out_times: pd.DatetimeIndex
Times to return (should be trimmed 1 day at
each end from the given index)
Returns
-------
df_complete: pd.DataFrame
A dataframe with the disaggregated data in it
df_base: pd.DataFrame
A dataframe with the state data in it
"""
lat = ds['lat'].values.flatten()[0]
lon = ds['lon'].values.flatten()[0]
elev = ds['elev'].values.flatten()[0]
params['elev'] = elev
params['lat'] = lat
params['lon'] = lon
df = ds.to_dataframe()
# solar_geom returns a tuple due to restrictions of numba
# for clarity we convert it to a dictionary here
sg = solar_geom(elev, lat, params['lapse_rate'])
sg = {'tiny_rad_fract': sg[0], 'daylength': sg[1],
'potrad': sg[2], 'tt_max0': sg[3]}
yday = df.index.dayofyear - 1
df['daylength'] = sg['daylength'][yday]
df['potrad'] = sg['potrad'][yday]
df['tt_max'] = sg['tt_max0'][yday]
# Generate the daily values - these are saved
# so that we can use a subset of them to write
# out the state file later
df_base = func(df, params)
if disagg:
# Get some values for padding the time list,
# so that when interpolating in the disaggregation
# functions we can match endpoints with adjoining
# chunks - if no data is available, just repeat some
# default values (this case is used at the very
# beginning and end of the record)
try:
prevday = out_times[0] - pd.Timedelta('1 days')
t_begin = [ds['t_min'].sel(time=prevday),
ds['t_max'].sel(time=prevday)]
except (KeyError, ValueError):
t_begin = [state['t_min'].values[-1],
state['t_max'].values[-1]]
try:
nextday = out_times[-1] + pd.Timedelta('1 days')
t_end = [ds['t_min'].sel(time=nextday),
ds['t_max'].sel(time=nextday)]
except (KeyError, ValueError):
# None so that we don't extend the record
t_end = None
# Disaggregate to subdaily values
df_complete = disaggregate(df, params, sg, t_begin, t_end)
# Calculate the times that we want to get out by chopping
# off the endpoints that were added on previously
start = out_times.values[0]
stop = (out_times.values[-1] + pd.Timedelta('1 days') -
pd.Timedelta("{} minutes".format(params['time_step'])))
if params['period_ending']:
start += pd.Timedelta('{} minutes'.format(params['time_step']))
stop += pd.Timedelta('{} minutes'.format(params['time_step']))
new_times = date_range(
start, stop, freq='{}T'.format(params['time_step']),
calendar=params['calendar'])
else:
# convert srad to daily average flux from daytime flux
df_base['shortwave'] *= df_base['daylength'] / cnst.SEC_PER_DAY
# If we're outputting daily values, we dont' need to
# change the output dates - see inside of `if` condition
# above for more explanation
new_times = out_times
df_complete = df_base
# Cut the returned data down to the correct time index
df_complete = df_complete.loc[new_times[0]:new_times[-1]]
return df_complete, df_base
@dask.delayed()
def wrap_run_slice(params, write_locks, domain_slice=NO_SLICE):
params['is_worker'] = True
ms = MetSim(params, domain_slice=domain_slice)
ms.load_inputs()
ms.run_slice()
ms.write_chunk(locks=write_locks)
[docs]def chunk_domain(chunks, dims):
'''
Return a dictionary of chunk slices that can be used to decompose a grid
'''
def left(chunk, dim):
return np.arange(0, dim, chunk)
def right(chunk, dim):
nums = np.arange(chunk, dim + chunk, chunk)
nums[-1] = dim + 1
return nums
slices = [[slice(*a) for a in (zip(left(int(chunks[dim]), dims[dim]),
right(int(chunks[dim]), dims[dim])))]
for dim in chunks.keys()]
return [dict(zip(chunks.keys(), p)) for p in itertools.product(*slices)]
[docs]class DummyLock(object):
"""DummyLock provides the lock API without any actual locking."""
# This will be available in xarray in the next major version
def acquire(self, *args):
pass
def release(self, *args):
pass
def __enter__(self):
pass
def __exit__(self, *args):
pass
@property
def locked(self):
return False
[docs]def add_prec_tri_vars(domain):
"""
Check that variables for triangle precipitation method exist and have
values that are within allowable ranges. Return these variables.
Parameters
----------
domain:
Dataset of domain variables for given location.
Returns
-------
dur
Array of climatological monthly storm durations. [minutes]
t_pk
Array of climatological monthly times to storm peaks. [minutes]
"""
# Check that variables exist
try:
dur = domain['dur']
except Exception as e:
raise e("Storm duration and time to peak values are "
"required in the domain file for the triangle "
"preciptation disagregation method.")
try:
t_pk = domain['t_pk']
except Exception as e:
raise e("Storm duration and time to peak values are "
"required in the domain file for the triangle "
"preciptation disagregation method.")
# Check that variable values are within allowable ranges.
day_length = cnst.MIN_PER_HOUR * cnst.HOURS_PER_DAY
dur_zero_test = dur <= 0
dur_day_test = dur > day_length
if dur_zero_test.any() or dur_day_test.any():
raise ValueError('Storm duration must be greater than 0 and less than',
day_length, '(i.e. the day length in minutes)')
t_pk_zero_test = t_pk < 0
t_pk_day_test = t_pk > day_length
if t_pk_zero_test.any() or t_pk_day_test.any():
raise ValueError('Storm time to peak must be greater than or equal to '
'0, and less than', day_length,
'(i.e. the end of a day)')
return dur, t_pk