# -*- coding: utf-8 -*-
import collections
import inspect
from collections import OrderedDict
from copy import deepcopy
from functools import wraps
import numpy as np
import xarray as xr
import pandas as pd
from . import decorators
@decorators.NDto2DtoND(
referenceIn=0, noOfInDimsToKeep=1, convertInputs=[0, 1],
convertOutputs=[0], verbosity=10
[docs])
def rayleigh(diameter, K2, frequency):
"""
To do: move this routine into separte module together with t-matrix,
mie etc code.
"""
C = 299792458.
K2 = np.asarray(K2)
diameter = np.asarray(diameter)
wavelength = C / (frequency*1e9)
prefactor = np.pi**5 * K2 / wavelength**4
back_spec = prefactor[:, np.newaxis] * diameter**6
return back_spec
[docs]def provideArgKwargNames(func):
'''Provide all arguments and keyword arguments of a function. Does not
work with *arg and **kwarg
'''
argNames = []
kwargNames = []
insp = inspect.signature(func)
for p in insp.parameters:
default = insp.parameters[p].default
if default is insp.empty:
argNames.append(p)
else:
kwargNames.append(p)
return argNames, kwargNames
[docs]def concatDicts(*dicts):
"""
concatenate (ordered) dicts
"""
dMerged = deepcopy(dicts[0])
for dd in range(1, len(dicts)):
dMerged.update(dicts[dd])
return dMerged
[docs]def swapListItems(li, i1, i2):
'''
Swap i1 and i2 of li.
'''
li = deepcopy(li)
a = li.index(i1)
b = li.index(i2)
li[b], li[a] = li[a], li[b]
return li
[docs]class AttrDict(collections.OrderedDict):
"""Dictionary accesible through attributes"""
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
[docs]def dimensionToVariables(darray, dimension, variables):
profile = {}
for ii, var in enumerate(variables):
profile[var] = darray.isel(**{dimension: ii})
profile = xr.Dataset(profile)
return profile
[docs]def mergeFlattResults(
core_shapes=None
):
"""Decorator to merge multiple outputs of a function's result
into one joined array. Dimensions NOT in core_shapes are preserved.
"""
def mergeFlattResults_decorator(func):
@wraps(func)
def flatt_func(*args, **kwargs):
result = func(*args, **kwargs)
if type(result) is not tuple:
return result
else:
result = list(result)
for ii in range(len(result)):
res_shape = list(result[ii].shape)
for sh in core_shapes[ii]:
res_shape.remove(sh)
new_shape = tuple(
res_shape) + tuple([np.product(
core_shapes[ii]).astype(int)])
result[ii] = result[ii].reshape(new_shape)
result = np.concatenate(result, axis=-1)
return result
return flatt_func
return mergeFlattResults_decorator
[docs]def apply_ufunc_extended(
func,
*args,
**kwargs
):
"""Extended version of xarray's `xr.apply_ufunc` which can handle with
multiple output of a functions and dask. New keyword output_names
required with a list of the names of the returned variables. Note that
different types in the results are not preserved.
"""
input_core_dims = kwargs.get('input_core_dims', [])
output_sizes = kwargs.pop('output_sizes', {})
output_names = kwargs.pop('output_names', [])
output_core_dims = kwargs.pop('output_core_dims', [])
if len(output_core_dims) == 0:
output_core_dims = [tuple()] * len(output_names)
assert len(output_core_dims) == len(output_names)
OrderedDict(zip(output_names, output_core_dims))
all_input_core_dims = []
for input_core_dim in input_core_dims:
all_input_core_dims.append(*input_core_dim)
all_input_core_dims = list(set(all_input_core_dims))
non_core_dims = []
for arg in args:
non_core_dims += [d for d in arg.dims if d not in all_input_core_dims]
non_core_dims = list(set(non_core_dims))
arg0 = args[0]._to_temp_dataset()
non_core_shape = tuple([arg0.dims[key] for key in non_core_dims])
for ii in range(len(output_core_dims)):
if type(output_core_dims[ii]) is not tuple:
output_core_dims[ii] = tuple([output_core_dims[ii]])
output_shapes = OrderedDict()
output_core_shapes = []
for key, val in zip(output_names, output_core_dims):
output_shapes[key] = non_core_shape + \
tuple([output_sizes[ii] for ii in val])
output_core_shapes.append(tuple([output_sizes[ii] for ii in val])
)
len_tmp_dim = np.sum([np.prod(vv) for vv in output_shapes.values()])
flat_func = mergeFlattResults(output_core_shapes)(func)
results = xr.apply_ufunc(
flat_func,
*args,
output_core_dims=[('<TEMP_DIM>',)],
output_sizes={'<TEMP_DIM>': len_tmp_dim},
**kwargs,
)
ii = 0
this_result = OrderedDict()
for kk, (key, this_shape) in enumerate(
zip(output_names, output_core_shapes)):
new_shape = non_core_shape + this_shape
this_len = np.prod(this_shape).astype(int)
this_result[key] = results.data[..., ii:ii+this_len].reshape(new_shape)
this_ccords = [arg0.coords[k] for k in non_core_dims] + [
xr.DataArray(
range(output_sizes[cc]),
name=cc,
dims=[cc]
) for cc in output_core_dims[kk]]
this_result[key] = xr.DataArray(this_result[key], coords=this_ccords)
ii += this_len
return xr.Dataset(this_result)
[docs]def xrGradient(data, dimension=None):
'''
Wrapper for np.gradient which is not available in xarray
'''
if dimension is None:
axis = 0
else:
axis = data.get_axis_num(dimension)
return xr.DataArray(np.gradient(data, axis=axis), coords=data.coords)
from collections import OrderedDict
from xarray.core.variable import IndexVariable
# https://github.com/pydata/xarray/issues/1560
[docs]def xrFastUnstack(dataset, dim):
"""
Faster version of xarray's unstack. Required until
https://github.com/pydata/xarray/issues/1560 is closed.
Unstack an existing dimension corresponding to a MultiIndex into
multiple new dimensions.
New dimensions will be added at the end.
Parameters
----------
dim : str
Name of the existing dimension to unstack.
Returns
-------
unstacked : Dataset
Dataset with unstacked data.
See also
--------
Dataset.stack
"""
if dim not in dataset.dims:
raise ValueError('invalid dimension: %s' % dim)
index = dataset.get_index(dim)
if not isinstance(index, pd.MultiIndex):
raise ValueError('cannot unstack a dimension that does not have '
'a MultiIndex')
new_dim_names = index.names
new_dim_sizes = [lev.size for lev in index.levels]
full_idx = pd.MultiIndex.from_product(index.levels, names=index.names)
if not index.equals(full_idx):
obj = dataset.reindex(copy=False, **{dim: full_idx})
else:
obj = dataset
variables = OrderedDict()
for name, var in obj.variables.items():
if name != dim:
if dim in var.dims:
new_dims = OrderedDict(zip(new_dim_names, new_dim_sizes))
variables[name] = var.unstack(**{dim: new_dims})
else:
variables[name] = var
for name, lev in zip(new_dim_names, index.levels):
variables[name] = IndexVariable(name, lev)
coord_names = set(dataset._coord_names) - set([dim]) | set(new_dim_names)
return dataset._replace_vars_and_dims(variables, coord_names)