#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
This module contains:
- a class to handle variance spectrum;
- a function to compute DCT spectrum from a 2D field;
- a function to sort spectra with regards to their name;
- a function to plot a series of spectra;
- a function to read a dumped spectrum;
- a function to create a spectral geometry, e.g., for GRIBs.
"""
from bronx.graphics.axes import set_figax
import numpy
import copy
from epygram import epygramError, config
from epygram.geometries import GaussGeometry, SpectralGeometry
from epygram.util import RecursiveObject, write_formatted_table
_file_id = 'epygram.spectra.Spectrum'
_file_columns = ['#', 'lambda', 'variance']
def nlonlat_to_nsmax(nlon, nlat, stretching, trunctype):
"""
Relationship between grid-point space and spectral space.
nlat is the number of latitudes, nlon is the maximum number of longitudes.
trunc_type should be one of "linear", "quadratic" or "cubic"
Returns the maximum total dimensionless wavenumber.
"""
trunctype2ratio = {"linear": 2, "quadratic": 3, "cubic": 4}
if trunctype not in trunctype2ratio.keys():
raise ValueError("trunc_type should be one of 'linear', 'quadratic' or 'cubic'")
ratio = trunctype2ratio[trunctype]
if stretching == 1.0:
return int(numpy.floor((nlon - 1) / ratio))
else:
return int(numpy.floor(min(nlon - 1, 2 * nlat - 3) / ratio))
def make_spectral_geometry(geometry, trunctype="linear", verbose=False):
"""
Returns a SpectralGeometry object consistent with the input grid-point
geometry and with the required truncation type.
This is only implemented for Gaussiand grids, with a triangular truncation.
"""
if trunctype not in ("linear", "quadratic", "cubic"):
raise ValueError("trunctype should be either 'linear', 'quadratic' or 'cubic'")
if not isinstance(geometry, GaussGeometry):
raise NotImplementedError(
"No meaningful spectral transform implemented for " + geometry.name
)
if verbose:
print(
f"Build spectral geometry assuming {trunctype} and triangular truncation."
)
stretching = geometry.grid["dilatation_coef"]
nlat = geometry.dimensions["lat_number"]
nlon = geometry.dimensions["max_lon_number"]
truncation = dict(
max=nlonlat_to_nsmax(nlon, nlat, stretching, trunctype),
shape="triangular",
)
spectral_geometry = SpectralGeometry("legendre", truncation)
if verbose:
print("Built spectral geometry:", spectral_geometry)
return spectral_geometry
def get_spectral_geometry(field, resource, verbose=False):
"""
Returns the SpectralGeometry object of the field or resource.
If the field has no spectral geometry, return the spectral geometry of the resource.
If the resource has no spectral geometry, returns None.
"""
spectral_geometry = None
if field.spectral_geometry is not None:
spectral_geometry = field.spectral_geometry
elif hasattr(resource, "spectral_geometry"):
spectral_geometry = resource.spectral_geometry
if verbose:
print("Spectral geometry is", spectral_geometry)
return spectral_geometry
[docs]
def read_Spectrum(filename):
"""Read a Spectrum written in file and return it."""
with open(filename, 'r') as _file:
init_kwargs = {}
# Spectrum file id
assert _file.readline()[:-1] == _file_id, \
' '.join(["file:", filename, "does not contain a Spectrum."])
# header: other stuff
line = _file.readline()[:-1]
while line[0] != '#':
init_kwargs[line.split('=')[0].strip()] = line.split('=')[1].strip()
line = _file.readline()[:-1]
# columns description
assert line.split() == _file_columns
# values
table = [line.split() for line in _file.readlines()]
if int(table[0][0]) == 0:
init_kwargs['mean2'] = float(table.pop(0)[2])
elif not int(table[0][0]) == 1:
raise epygramError("first wavenumber must be 0 or 1.")
if 'resolution' in init_kwargs:
init_kwargs['resolution'] = float(init_kwargs['resolution'])
else:
k = int(table[-1][0])
init_kwargs['resolution'] = float(table[-1][1]) * k / (2 * (k + 1))
variances = [float(line[2]) for line in table]
return Spectrum(variances, **init_kwargs)
[docs]
class Spectrum(RecursiveObject):
"""
A spectrum can be seen as a quantification of a signal's variance with
regards to scale.
If the signal is defined in physical space on N points, its spectral
representation will be a squared mean value (wavenumber 0) and variances for
N-1 wavenumbers.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains
Using the Discrete Cosine Transform (DCT)'
"""
def __init__(self, variances,
name=None,
resolution=None,
mean2=None,
**kwargs):
"""
:param variances: variances of the spectrum, from wavenumber 1 to N-1.
:param name: an optional name for the spectrum.
:param resolution: an optional resolution for the field represented by
the spectrum. It is used to compute the according
wavelengths.
Resolution unit is arbitrary, to the will of the
user.
:param mean2: the optional mean^2 of the field, i.e. variance of
wavenumber 0 of the spectrum.
"""
self.variances = numpy.array(variances)
self.name = name
self.resolution = resolution
self.mean2 = mean2
for k, v in kwargs.items():
setattr(self, k, v)
@property
def wavenumbers(self):
"""Gets the wavenumbers of the spectrum."""
return numpy.arange(1, len(self.variances) + 1)
@property
def wavelengths(self):
"""Gets the wavelengths of the spectrum."""
K = len(self.variances) + 1
return numpy.array([2. * self.resolution * K / k
for k in self.wavenumbers])
[docs]
def write(self, out):
"""
Writes the spectrum with formatted output in *out*.
:param out: must be an output open file-like object.
"""
out.write(_file_id + '\n')
if self.name is not None:
out.write('name = ' + str(self.name) + '\n')
if self.resolution is not None:
out.write('resolution = ' + str(self.resolution) + '\n')
table = [_file_columns,
[0, '-', self.mean2]]
wn = self.wavenumbers
wl = self.wavelengths
var = self.variances
for k in range(len(var)):
table.append([wn[k], wl[k], var[k]])
write_formatted_table(out, table)
[docs]
def dump(self, filename):
"""
Writes the spectrum with formatted output in *filename*.
"""
with open(filename, 'w') as _file:
self.write(_file)
[docs]
def plotspectrum(self,
together_with=[],
over=(None, None),
slopes=[{'exp':-3, 'offset':1, 'label':'-3'},
{'exp':-5. / 3., 'offset':1, 'label':'-5/3'}],
zoom=None,
unit='SI',
title=None,
figsize=None,
takeover=False):
"""
Plot the spectrum.
:param together_with: another spectrum or list of spectra to plot on the
same ax.
Cf. function plotspectra() of this module for other arguments.
"""
if isinstance(together_with, Spectrum):
together_with = [together_with]
for s in together_with:
assert isinstance(s, Spectrum)
return plotspectra([self] + together_with,
over=over,
slopes=slopes,
zoom=zoom,
unit=unit,
title=title,
figsize=figsize)
##########
# internal
def _check_operands(self, other):
"""Check compatibility of both spectra."""
if isinstance(other, Spectrum):
assert all((len(self.variances) == len(other.variances),
self.resolution == other.resolution or
None in (self.resolution, other.resolution))), \
"operations between spectra require that they share dimension and resolution."
else:
try:
_ = float(other)
except (ValueError, TypeError) as e:
raise type(e)('*other* must be a Spectrum or a float-convertible.')
if isinstance(other, Spectrum):
othermean2 = other.mean2
othername = other.name
otherval = other.variances
else:
othermean2 = other
othername = str(other)
otherval = other
return (othermean2, othername, otherval)
def __add__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 + othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances + otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __sub__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 - othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances - otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __mul__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 * othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances * otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __div__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 / othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances / otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
# FUNCTIONS FOR SPECTRA #
#########################
[docs]
def sort(spectra):
""" Sort a list of spectra with regards to their name. """
untied_spectra = copy.copy(spectra)
sortedspectra = []
for f in sorted([s.name for s in untied_spectra], reverse=True):
for s in range(len(untied_spectra)):
if untied_spectra[s].name == f:
sortedspectra.append(untied_spectra.pop(s))
break
return sortedspectra
[docs]
def dctspectrum(x, verbose=False, log=None):
"""
Function *dctspectrum* takes a 2D-array as argument and returns its 1D
DCT ellipse spectrum.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains Using
the Discrete Cosine Transform (DCT).'
:param verbose: verbose mode
:param log: an optional logging.Logger instance to which print info
in *verbose* case.
"""
import scipy.fftpack as tfm
# compute transform
if log is not None and verbose:
log.info("dctspectrum: compute DCT transform...")
norm = 'ortho' # None
y = tfm.dct(tfm.dct(x, norm=norm, axis=0), norm=norm, axis=1)
# compute spectrum
if log is not None and verbose:
log.info("dctspectrum: compute variance spectrum...")
N, M = y.shape
N2 = N ** 2
M2 = M ** 2
MN = M * N
K = min(M, N)
variance = numpy.zeros(K)
variance[0] = y[0, 0] ** 2 / MN
for j in range(0, N):
j2 = float(j) ** 2
for i in range(0, M):
var = y[j, i] ** 2 / MN
k = numpy.sqrt(float(i) ** 2 / M2 + j2 / N2) * K
k_inf = int(numpy.floor(k))
k_sup = k_inf + 1
weightsup = k - k_inf
weightinf = 1.0 - weightsup
if 0 <= k < 1:
variance[1] += weightsup * var
if 1 <= k < K - 1:
variance[k_inf] += weightinf * var
variance[k_sup] += weightsup * var
if K - 1 <= k < K:
variance[k_inf] += weightinf * var
return variance
[docs]
def global_spectrum(field):
"""
Return variance spectrum of a global spectral field on a GaussGeometry,
from its spectral coefficients.
"""
assert field.geometry.isglobal
assert isinstance(field.geometry, GaussGeometry)
assert field.spectral_geometry.truncation["shape"] == "triangular"
assert field.spectral
data = field.getdata()
nsmax = field.spectral_geometry.truncation["max"]
assert data.size == (nsmax+1) * (nsmax+2)
variances = numpy.zeros(nsmax + 1)
jdata = 0
# Loop on zonal wavenumber m
for m in range(nsmax + 1):
# Loop on total wavenumber n
for n in range(m, nsmax + 1):
squaredmodule = data[jdata] ** 2 + data[jdata+1] ** 2
if m == 0:
variances[n] += squaredmodule
# Check that coefficient (m, n) == (0, n) is its own complex conjugate,
# i.e. it has zero imaginary part
assert data[jdata+1] == 0
else:
# Coefficient (-m, n) is the complex conjugate of (m, n).
# It is not stored so (m, n) should be counted twice.
variances[n] += 2 * squaredmodule
jdata += 2
return variances
[docs]
def plotspectra(spectra,
over=(None, None),
slopes=[{'exp':-3, 'offset':1, 'label':'-3'},
{'exp':-5. / 3., 'offset':1, 'label':'-5/3'}],
zoom=None,
unit='SI',
title=None,
figsize=None,
takeover=False):
"""
To plot a series of spectra.
:param over: any existing figure and/or ax to be used for the
plot, given as a tuple (fig, ax), with None for
missing objects. *fig* is the frame of the
matplotlib figure, containing eventually several
subplots (axes); *ax* is the matplotlib axes on
which the drawing is done. When given (is not None),
these objects must be coherent, i.e. ax being one of
the fig axes.
:param spectra: a Spectrum instance or a list of.
:param unit: string accepting LaTeX-mathematical syntaxes
:param slopes: list of dict(
- exp=x where x is exposant of a A*k**-x slope
- offset=A where A is logscale offset in a A*k**-x slope;
a offset=1 is fitted to intercept the first spectra at wavenumber = 2
- label=(optional label) appearing 'k = label' in legend)
:param zoom: dict(xmin=,xmax=,ymin=,ymax=)
:param title: title for the plot
:param figsize: figure sizes in inches, e.g. (5, 8.5).
Default figsize is config.plotsizes.
:param takeover: give the user more access to the objects used in the
plot, by returning a dict containing them instead of only fig/ax
"""
import matplotlib.pyplot as plt
plt.rc('font', family='serif')
if figsize is None:
figsize = config.plotsizes
fig, ax = set_figax(*over, figsize=figsize)
result = {'fig':fig, 'ax':ax}
if isinstance(spectra, Spectrum):
spectra = [spectra]
# prepare dimensions
window = dict()
window['ymin'] = min([min(s.variances) for s in spectra]) / 10
window['ymax'] = max([max(s.variances) for s in spectra]) * 10
window['xmax'] = max([max(s.wavelengths) for s in spectra]) * 1.5
window['xmin'] = min([min(s.wavelengths) for s in spectra]) * 0.8
if zoom is not None:
for k, v in zoom.items():
window[k] = v
x1 = window['xmax']
x2 = window['xmin']
# colors and linestyles
colors = ['red', 'blue', 'green', 'orange', 'magenta', 'darkolivegreen',
'yellow', 'salmon', 'black']
linestyles = ['-', '--', '-.', ':']
# axes
if title is not None :
ax.set_title(title)
ax.set_yscale('log')
ax.set_ylim(window['ymin'], window['ymax'])
ax.set_xscale('log')
ax.set_xlim(window['xmax'], window['xmin'])
ax.grid()
ax.set_xlabel('wavelength ($km$)')
ax.set_ylabel(r'variance spectrum ($' + unit + '$)')
# plot slopes
# we take the second wavenumber (of first spectrum) as intercept, because
# it is often better fitted with spectrum than the first one
x_intercept = spectra[0].wavelengths[1]
y_intercept = spectra[0].variances[1]
i = 0
result['slopes'] = []
for slope in slopes:
# a slope is defined by y = A * k**-s and we plot it with
# two points y1, y2
try:
label = slope['label']
except KeyError:
# label = str(Fraction(slope['exp']).limit_denominator(10))
label = str(slope['exp'])
# because we plot w/r to wavelength, as opposed to wavenumber
s = -slope['exp']
A = y_intercept * x_intercept ** (-s) * slope['offset']
y1 = A * x1 ** s
y2 = A * x2 ** s
line = ax.plot([x1, x2], [y1, y2], color='0.7',
linestyle=linestyles[i % len(linestyles)],
label=r'$k^{' + label + '}$')
result['slopes'].append(line)
i += 1
# plot spectra
i = 0
result['spectra'] = []
for s in spectra:
line = ax.plot(s.wavelengths, s.variances, color=colors[i % len(colors)],
linestyle=linestyles[i // len(colors)], label=s.name)
result['spectra'].append(line)
i += 1
# legend
legend = ax.legend(loc='lower left', shadow=True)
result['legend'] = legend
for label in legend.get_texts():
label.set_fontsize('medium')
return result if takeover else (fig, ax)