# -*- coding: utf-8 -*-
import warnings
import numpy as np
import xarray as xr
from .. import helpers, units
from ..libs import pyPamtraRadarMoments, pyPamtraRadarSimulator
from .core import microwaveInstrument
[docs]class simpleRadar(microwaveInstrument):
def __init__(
self,
name='simpleRadar',
parent=None,
frequencies='all',
radarK2=0.93,
gaseousAttenuationModel='Rosenkranz98',
applyAttenuation=None,
**kwargs
):
super().__init__(
name=name,
parent=parent,
frequencies=frequencies,
radarK2=radarK2,
applyAttenuation=applyAttenuation,
gaseousAttenuationModel=gaseousAttenuationModel,
**kwargs
)
[docs] def solve(self):
self._link_parent()
self._calcPIA()
if self.parent.nHydrometeors > 0:
wavelength = self.parent.profile.wavelength.sel(
frequency=self.frequencies)
K2 = self.settings['radarK2']
Ze_increment = 0.
MDV_increment = 0.
for hydro in self.hydrometeorProfiles.keys():
bsc = self.hydrometeorProfiles[hydro].backscatterCrossSection\
.sel(frequency=self.frequencies)
N = self.hydrometeorProfiles[hydro]\
.numberConcentration.fillna(0)
Ze_bin = 1.e18*(1.e0/(K2*np.pi**5))*bsc*N*(wavelength)**4
vel = self.hydrometeorProfiles[hydro].fallVelocity
Ze_increment += Ze_bin.sum(['sizeBin'])
MDV_increment += (vel*Ze_bin).sum(['sizeBin'])
# perHydro = []
# for name in self.hydrometeors.keys():
# numberConcentration = self.hydrometeors[
# name].profile.numberConcentration.fillna(0)
# crossSec = self.hydrometeors[name].profile[crossSection]
# if frequencies is not None:
# crossSec.sel(frequency=frequencies)
# thisHydro = crossSec * numberConcentration
# perHydro.append(thisHydro)
# integrated[crossSection] = xr.concat(
# perHydro, dim='hydro').sum(['hydro', 'sizeBin'])
# crossSections = self.parent.getIntegratedScatteringCrossSections(
# crossSections=['backscatterCrossSection'],
# frequencies=self.frequencies,
# )
# back = crossSections['backscatterCrossSection']
# Ze_back = 1.e18*(1.e0/(K2*np.pi**5))*back*(wavelength)**4
self.results['radarReflectivity'] = 10*np.log10(Ze_increment)
self.results['meanDopplerVel'] = MDV_increment/Ze_increment
if self.settings['applyAttenuation'] is None:
pass
elif self.settings['applyAttenuation'] == 'bottomUp':
self.results['radarReflectivity'] -= self.results[
'pathIntegratedAttBottomUp']
elif self.settings['applyAttenuation'] == 'topDown':
self.results['radarReflectivity'] -= self.results[
'pathIntegratedAttTopDown']
else:
raise ValueError('Do not understand applyAttenuation: %s. Must'
' be None, "bottomUp" or "topDown"' %
self.settings['applyAttenuation'])
else:
warnings.warn('No hydrometeors found. Calculating only gaseous '
'attenuation.')
for vv in self.results.variables:
self.results[vv].attrs['unit'] = units.units[vv]
return self.results
[docs] def _calcPIA(self):
absHydro = self._calcHydrometeorAbsorption()
absGas = self._calcGaseousAbsorption()
# Doviak Zrnic eq 3.12 with 10*np.log10(np.exp(X) = 4.34*X
specificAttenuation = 10*np.log10(np.exp(absHydro + absGas))
PIA_bottomup, PIA_topdown = self._attenuation2pia(specificAttenuation)
self.results['specificAttenuation'] = specificAttenuation
self.results['pathIntegratedAttBottomUp'] = PIA_bottomup
self.results['pathIntegratedAttTopDown'] = PIA_topdown
[docs] def _attenuation2pia(self, attenuation, dim='layer'):
attenuation = attenuation * self.parent.profile.heightBinDepth
attenuationRev = attenuation.isel(
layer=attenuation.coords[dim].values[::-1])
PIA_bottomup = attenuation.cumsum(dim) * 2 - attenuation
PIA_topdown = attenuationRev.cumsum(dim) * 2 - attenuationRev
PIA_topdown = PIA_topdown.isel(
layer=attenuation.coords[dim].values[::-1])
return PIA_bottomup, PIA_topdown
[docs]class dopplerRadarPamtra(simpleRadar):
def __init__(
self,
name='dopplerRadarPamtra',
parent=None,
frequencies='all',
radarMaxV=7.885,
radarMinV=-7.885,
radarAliasingNyquistInterv=4,
radarNFFT=256,
verbosity=0,
radarAirmotion=True,
radarAirmotionModel="constant", # "constant","linear","step"
radarAirmotionVmin=0,
radarAirmotionVmax=0,
radarAirmotionLinearSteps=30,
radarAirmotionStepVmin=0.5,
radarK2=0.93,
radarBeamwidthDeg=0.2,
radarIntegrationTime=60,
radarPNoise1000=-30,
radarNAve=150,
momentsNPeaks=2,
momentsNoiseDistanceFactor=0,
momentsSpecNoiseMean=None,
momentsSpecNoiseMax=None,
momentsPeakMinSnr=-10,
momentsPeakMinBins=2,
momentsSmoothSpectrum=True,
momentsUseWiderPeak=False,
momentsReceiverMiscalibration=0,
seed=0,
applyAttenuation=None,
gaseousAttenuationModel='Rosenkranz98',
):
super().__init__(
name=name,
parent=parent,
frequencies=frequencies,
radarMaxV=radarMaxV,
radarMinV=radarMinV,
radarAliasingNyquistInterv=radarAliasingNyquistInterv,
radarNFFT=radarNFFT,
verbosity=verbosity,
radarAirmotion=radarAirmotion,
radarAirmotionModel=radarAirmotionModel,
radarAirmotionVmin=radarAirmotionVmin,
radarAirmotionVmax=radarAirmotionVmax,
radarAirmotionLinearSteps=radarAirmotionLinearSteps,
radarAirmotionStepVmin=radarAirmotionStepVmin,
radarK2=radarK2,
radarPNoise1000=radarPNoise1000,
radarNAve=radarNAve,
radarBeamwidthDeg=radarBeamwidthDeg,
radarIntegrationTime=radarIntegrationTime,
seed=seed,
momentsNPeaks=momentsNPeaks,
momentsNoiseDistanceFactor=momentsNoiseDistanceFactor,
momentsSpecNoiseMean=momentsSpecNoiseMean,
momentsSpecNoiseMax=momentsSpecNoiseMax,
momentsPeakMinSnr=momentsPeakMinSnr,
momentsPeakMinBins=momentsPeakMinBins,
momentsSmoothSpectrum=momentsSmoothSpectrum,
momentsUseWiderPeak=momentsUseWiderPeak,
momentsReceiverMiscalibration=momentsReceiverMiscalibration,
applyAttenuation=applyAttenuation,
gaseousAttenuationModel=gaseousAttenuationModel,
)
[docs] def solve(self):
if len(self.frequencies) > 1:
warnings.warn('Note that the radar simulator '
'assumes that settings are the same for '
'all frequencies (due to '
'missing support for vectorized '
'settings.)')
self._link_parent()
for name in self.hydrometeorProfiles.keys():
if len(self.hydrometeorProfiles[name].sizeBin) <= 1:
raise ValueError('nbins must be greater than 1 for the'
'spectral radar simulator!')
self._calcPIA()
if self.parent.nHydrometeors > 0:
self._calcRadarSpectrum()
self._simulateRadar()
self._calcMoments()
for vv in self.results.variables:
self.results[vv].attrs['unit'] = units.units[vv]
return self.results
[docs] def _calcRadarSpectrum(self):
hydroVars = [
'sizeCenter',
'sizeBoundsWidth',
'backscatterCrossSection',
'fallVelocity',
'numberConcentration',
]
profileVars = [
'verticalWind',
'wavelength',
]
funcArgVars = [
'sizeCenter',
'sizeBoundsWidth',
'bcsWEIGHTED',
'fallVelocity',
] + profileVars
radarSpecs = []
for name in self.hydrometeorProfiles.keys():
# would be great if this could become an attribute of
# parent.hydrometeors[name].profile and we could use
# self.hydrometeorProfiles.
mergedProfile = self.parent.hydrometeors[
name
].getProfileWithParentAllBroadcasted(
variables=hydroVars,
parentVariables=profileVars,
exclude=['sizeBin'],
).sel(frequency=self.frequencies)
mergedProfile = mergedProfile.stack(
merged=helpers.concatDicts(
self.parent.coords['additional'],
self.parent.coords['layer'],
self.parent.coords['frequency']
)
)
mergedProfile['bcsWEIGHTED'] = (
mergedProfile['backscatterCrossSection'] *
mergedProfile['numberConcentration'].fillna(0)
)
argNames, kwargNames = helpers.provideArgKwargNames(
pyPamtraRadarSimulator.createRadarSpectrum)
args = []
for var in funcArgVars:
args.append(mergedProfile[var])
assert len(argNames) == len(args)
input_core_dims = helpers.getInputCoreDims(args, ['sizeBin'])
kwargs = {}
for k in kwargNames:
kwargs[k] = self.settings[k]
nfft = kwargs['radarNFFT'] * (
1 + 2*kwargs['radarAliasingNyquistInterv']
)
radarSpec = xr.apply_ufunc(
pyPamtraRadarSimulator.createRadarSpectrum,
*args,
kwargs=kwargs,
input_core_dims=input_core_dims,
output_core_dims=[('dopplerVelocityAliased',)],
output_dtypes=[mergedProfile.backscatterCrossSection.dtype],
output_sizes={'dopplerVelocityAliased': nfft},
dask='parallelized',
)
radarSpecs.append(radarSpec)
radarSpecs = xr.concat(radarSpecs, dim='hydrometeor')
radarSpecs = radarSpecs.sum('hydrometeor')
radarSpecs = xr.Dataset({'radarSpecs': radarSpecs})
radarSpecs = helpers.xrFastUnstack(radarSpecs, 'merged').radarSpecs
self.results['radarIdealizedSpectrum'] = radarSpecs
self.results['radarIdealizedSpectrum'].attrs['unit'] = units.units[
'radarIdealizedSpectrum']
return self.results['radarIdealizedSpectrum']
[docs] def _simulateRadar(self):
argNames, kwargNames = helpers.provideArgKwargNames(
pyPamtraRadarSimulator.simulateRadarSpectrum)
kwargs = {}
for k in kwargNames:
kwargs[k] = self.settings[k]
variables = [
'height',
'eddyDissipationRate',
'horizontalWind',
'radarIdealizedSpectrum',
'pathIntegratedAttenuation',
'wavelength',
]
mergedProfile = self.parent.profile.copy()
mergedProfile['radarIdealizedSpectrum'] = self.results[
'radarIdealizedSpectrum']
mergedProfile = mergedProfile.sel(frequency=self.frequencies)
if self.settings['applyAttenuation'] is None:
mergedProfile['pathIntegratedAttenuation'] = xr.zeros_like(
mergedProfile.height)
elif self.settings['applyAttenuation'] == 'bottomUp':
mergedProfile['pathIntegratedAttenuation'] = self.results[
'pathIntegratedAttBottomUp']
elif self.settings['applyAttenuation'] == 'topDown':
mergedProfile['pathIntegratedAttenuation'] = self.results[
'pathIntegratedAttTopDown']
else:
raise ValueError('Do not understand applyAttenuation: %s. Must be'
'None, "bottomUp" or "topDown"' %
self.settings['applyAttenuation'])
mergedProfile = mergedProfile.stack(
merged=helpers.concatDicts(
self.parent.coords['additional'],
self.parent.coords['layer'],
self.parent.coords['frequency'])
)
args = []
for var in variables:
args.append(mergedProfile[var])
assert len(argNames) == len(args)
input_core_dims = helpers.getInputCoreDims(
args, ['dopplerVelocityAliased'])
radarSpec = xr.apply_ufunc(
pyPamtraRadarSimulator.simulateRadarSpectrum,
*args,
kwargs=kwargs,
input_core_dims=input_core_dims,
output_core_dims=[('dopplerVelocity',)],
output_dtypes=[mergedProfile.radarIdealizedSpectrum.dtype],
output_sizes={'dopplerVelocity': 256},
dask='parallelized',
)
radarSpec = xr.Dataset({'radarSpec': radarSpec})
radarSpec = helpers.xrFastUnstack(radarSpec, 'merged').radarSpec
self.results['radarSpectrum'] = radarSpec.assign_coords(
dopplerVelocity=np.linspace(
self.settings['radarMinV'],
self.settings['radarMaxV'],
self.settings['radarNFFT'],
endpoint=False
)
)
return self.results['radarSpectrum']
[docs] def _calcMoments(self):
output_core_dims = [
('peak'),
('peak'),
('peak'),
('peak'),
('peak'),
('peak'),
('peak'),
('peak'),
('peak'),
tuple(),
tuple(),
]
output_names = [
'radarReflectivity',
'meanDopplerVel',
'spectrumWidth',
'skewness',
'kurtosis',
'leftSlope',
'rightSlope',
'leftEdge',
'rightEdge',
'quality',
'noiseMean',
]
output_sizes = {
'peak': self.settings['momentsNPeaks'],
}
# theseVars
input_core_dims = [['dopplerVelocity', ]]
args = [
self.results.radarSpectrum.sel(
frequency=self.frequencies
).stack(merged=helpers.concatDicts(
self.parent.coords['additional'],
self.parent.coords['layer'],
self.parent.coords['frequency']
)
)
]
# take care of settings
argNames, kwargNames = helpers.provideArgKwargNames(
pyPamtraRadarMoments.calc_radarMoments)
kwargs = {}
for k in kwargNames:
kwargs[k] = self.settings[k]
assert len(argNames) == len(args)
moments = helpers.apply_ufunc_extended(
_calc_radarMoments_wrapper,
*args,
kwargs=kwargs,
output_names=output_names,
input_core_dims=input_core_dims,
output_core_dims=output_core_dims,
output_dtypes=[args[0].dtype],
output_sizes=output_sizes,
dask='parallelized',
)
moments = helpers.xrFastUnstack(moments, 'merged')
moments = moments.assign_coords(
peak=np.arange(1, self.settings['momentsNPeaks']+1)
)
self.results = self.results.merge(moments)
return moments
[docs]def _calc_radarMoments_wrapper(*args, **kwargs):
result = pyPamtraRadarMoments.calc_radarMoments(
*args, **kwargs)
spectrumOut, moments, slope, edge, quality, noiseMean = result
moments[moments == -9999.] = np.nan
slope[slope == -9999.] = np.nan
edge[edge == -9999.] = np.nan
noiseMean[noiseMean == -9999.] = np.nan
radarReflectivity = 10 * np.log10(moments[:, 0, :])
meanDopplerVel = moments[:, 1, :]
spectrumWidth = moments[:, 2, :]
skewness = moments[:, 3, :]
kurtosis = moments[:, 4, :]
leftSlope = slope[:, 0, :]
rightSlope = slope[:, 1, :]
leftEdge = edge[:, 0, :]
rightEdge = edge[:, 1, :]
quality = quality.astype(moments.dtype)
# No = noiseMean
result = (radarReflectivity, meanDopplerVel, spectrumWidth, skewness,
kurtosis, leftSlope, rightSlope, leftEdge, rightEdge,
quality, noiseMean)
return result