#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
This module contains:
- a class to handle variance spectrum;
- a function to compute DCT spectrum from a 2D field;
- a function to sort spectra with regards to their name;
- a function to plot a series of spectra.
- a function to read a dumped spectrum.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
from bronx.graphics.axes import set_figax
import numpy
import copy
from epygram import epygramError, config
from epygram.util import RecursiveObject, write_formatted_table
_file_id = 'epygram.spectra.Spectrum'
_file_columns = ['#', 'lambda', 'variance']
[docs]def read_Spectrum(filename):
"""Read a Spectrum written in file and return it."""
with open(filename, 'r') as _file:
init_kwargs = {}
# Spectrum file id
assert _file.readline()[:-1] == _file_id, \
' '.join(["file:", filename, "does not contain a Spectrum."])
# header: other stuff
line = _file.readline()[:-1]
while line[0] != '#':
init_kwargs[line.split('=')[0].strip()] = line.split('=')[1].strip()
line = _file.readline()[:-1]
# columns description
assert line.split() == _file_columns
# values
table = [line.split() for line in _file.readlines()]
if int(table[0][0]) == 0:
init_kwargs['mean2'] = float(table.pop(0)[2])
elif not int(table[0][0]) == 1:
raise epygramError("first wavenumber must be 0 or 1.")
if 'resolution' in init_kwargs:
init_kwargs['resolution'] = float(init_kwargs['resolution'])
else:
k = int(table[-1][0])
init_kwargs['resolution'] = float(table[-1][1]) * k / (2 * (k + 1))
variances = [float(line[2]) for line in table]
return Spectrum(variances, **init_kwargs)
[docs]class Spectrum(RecursiveObject):
"""
A spectrum can be seen as a quantification of a signal's variance with
regards to scale.
If the signal is defined in physical space on N points, its spectral
representation will be a squared mean value (wavenumber 0) and variances for
N-1 wavenumbers.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains
Using the Discrete Cosine Transform (DCT)'
"""
def __init__(self, variances,
name=None,
resolution=None,
mean2=None,
**kwargs):
"""
:param variances: variances of the spectrum, from wavenumber 1 to N-1.
:param name: an optional name for the spectrum.
:param resolution: an optional resolution for the field represented by
the spectrum. It is used to compute the according
wavelengths.
Resolution unit is arbitrary, to the will of the
user.
:param mean2: the optional mean^2 of the field, i.e. variance of
wavenumber 0 of the spectrum.
"""
self.variances = numpy.array(variances)
self.name = name
self.resolution = resolution
self.mean2 = mean2
for k, v in kwargs.items():
setattr(self, k, v)
@property
def wavenumbers(self):
"""Gets the wavenumbers of the spectrum."""
return numpy.arange(1, len(self.variances) + 1)
@property
def wavelengths(self):
"""Gets the wavelengths of the spectrum."""
K = len(self.variances) + 1
return numpy.array([2. * self.resolution * K / k
for k in self.wavenumbers])
[docs] def write(self, out):
"""
Writes the spectrum with formatted output in *out*.
:param out: must be an output open file-like object.
"""
out.write(_file_id + '\n')
if self.name is not None:
out.write('name = ' + str(self.name) + '\n')
if self.resolution is not None:
out.write('resolution = ' + str(self.resolution) + '\n')
table = [_file_columns,
[0, '-', self.mean2]]
wn = self.wavenumbers
wl = self.wavelengths
var = self.variances
for k in range(len(var)):
table.append([wn[k], wl[k], var[k]])
write_formatted_table(out, table)
[docs] def dump(self, filename):
"""
Writes the spectrum with formatted output in *filename*.
"""
with open(filename, 'w') as _file:
self.write(_file)
[docs] def plotspectrum(self,
together_with=[],
over=(None, None),
slopes=[{'exp':-3, 'offset':1, 'label':'-3'},
{'exp':-5. / 3., 'offset':1, 'label':'-5/3'}],
zoom=None,
unit='SI',
title=None,
figsize=None):
"""
Plot the spectrum.
:param together_with: another spectrum or list of spectra to plot on the
same ax.
Cf. function plotspectra() of this module for other arguments.
"""
if isinstance(together_with, Spectrum):
together_with = [together_with]
for s in together_with:
assert isinstance(s, Spectrum)
return plotspectra([self] + together_with,
over=over,
slopes=slopes,
zoom=zoom,
unit=unit,
title=title,
figsize=figsize)
##########
# internal
def _check_operands(self, other):
"""Check compatibility of both spectra."""
if isinstance(other, Spectrum):
assert all((len(self.variances) == len(other.variances),
self.resolution == other.resolution or
None in (self.resolution, other.resolution))), \
"operations between spectra require that they share dimension and resolution."
else:
try:
_ = float(other)
except (ValueError, TypeError) as e:
raise type(e)('*other* must be a Spectrum or a float-convertible.')
if isinstance(other, Spectrum):
othermean2 = other.mean2
othername = other.name
otherval = other.variances
else:
othermean2 = other
othername = str(other)
otherval = other
return (othermean2, othername, otherval)
def __add__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 + othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances + otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __sub__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 - othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances - otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __mul__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 * othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances * otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
def __div__(self, other):
(othermean2, othername, otherval) = self._check_operands(other)
mean2 = None if None in (self.mean2, othermean2) else self.mean2 / othermean2
name = None if (self.name is othername is None) else str(self.name) + '+' + str(othername)
return Spectrum(self.variances / otherval,
name=name,
resolution=self.resolution,
mean2=mean2)
# FUNCTIONS FOR SPECTRA #
#########################
[docs]def sort(spectra):
""" Sort a list of spectra with regards to their name. """
untied_spectra = copy.copy(spectra)
sortedspectra = []
for f in sorted([s.name for s in untied_spectra], reverse=True):
for s in range(len(untied_spectra)):
if untied_spectra[s].name == f:
sortedspectra.append(untied_spectra.pop(s))
break
return sortedspectra
[docs]def dctspectrum(x, verbose=False, log=None):
"""
Function *dctspectrum* takes a 2D-array as argument and returns its 1D
DCT ellipse spectrum.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains Using
the Discrete Cosine Transform (DCT).'
:param verbose: verbose mode
:param log: an optional logging.Logger instance to which print info
in *verbose* case.
"""
import scipy.fftpack as tfm
# compute transform
if log is not None and verbose:
log.info("dctspectrum: compute DCT transform...")
norm = 'ortho' # None
y = tfm.dct(tfm.dct(x, norm=norm, axis=0), norm=norm, axis=1)
# compute spectrum
if log is not None and verbose:
log.info("dctspectrum: compute variance spectrum...")
N, M = y.shape
N2 = N ** 2
M2 = M ** 2
MN = M * N
K = min(M, N)
variance = numpy.zeros(K)
variance[0] = y[0, 0] ** 2 / MN
for j in range(0, N):
j2 = float(j) ** 2
for i in range(0, M):
var = y[j, i] ** 2 / MN
k = numpy.sqrt(float(i) ** 2 / M2 + j2 / N2) * K
k_inf = int(numpy.floor(k))
k_sup = k_inf + 1
weightsup = k - k_inf
weightinf = 1.0 - weightsup
if 0 <= k < 1:
variance[1] += weightsup * var
if 1 <= k < K - 1:
variance[k_inf] += weightinf * var
variance[k_sup] += weightsup * var
if K - 1 <= k < K:
variance[k_inf] += weightinf * var
return variance
[docs]def plotspectra(spectra,
over=(None, None),
slopes=[{'exp':-3, 'offset':1, 'label':'-3'},
{'exp':-5. / 3., 'offset':1, 'label':'-5/3'}],
zoom=None,
unit='SI',
title=None,
figsize=None):
"""
To plot a series of spectra.
:param over: any existing figure and/or ax to be used for the
plot, given as a tuple (fig, ax), with None for
missing objects. *fig* is the frame of the
matplotlib figure, containing eventually several
subplots (axes); *ax* is the matplotlib axes on
which the drawing is done. When given (is not None),
these objects must be coherent, i.e. ax being one of
the fig axes.
:param spectra: a Spectrum instance or a list of.
:param unit: string accepting LaTeX-mathematical syntaxes
:param slopes: list of dict(
- exp=x where x is exposant of a A*k**-x slope
- offset=A where A is logscale offset in a A*k**-x slope;
a offset=1 is fitted to intercept the first spectra at wavenumber = 2
- label=(optional label) appearing 'k = label' in legend)
:param zoom: dict(xmin=,xmax=,ymin=,ymax=)
:param title: title for the plot
:param figsize: figure sizes in inches, e.g. (5, 8.5).
Default figsize is config.plotsizes.
"""
import matplotlib.pyplot as plt
plt.rc('font', family='serif')
if figsize is None:
figsize = config.plotsizes
fig, ax = set_figax(*over, figsize=figsize)
if isinstance(spectra, Spectrum):
spectra = [spectra]
# prepare dimensions
window = dict()
window['ymin'] = min([min(s.variances) for s in spectra]) / 10
window['ymax'] = max([max(s.variances) for s in spectra]) * 10
window['xmax'] = max([max(s.wavelengths) for s in spectra]) * 1.5
window['xmin'] = min([min(s.wavelengths) for s in spectra]) * 0.8
if zoom is not None:
for k, v in zoom.items():
window[k] = v
x1 = window['xmax']
x2 = window['xmin']
# colors and linestyles
colors = ['red', 'blue', 'green', 'orange', 'magenta', 'darkolivegreen',
'yellow', 'salmon', 'black']
linestyles = ['-', '--', '-.', ':']
# axes
if title is not None :
ax.set_title(title)
ax.set_yscale('log')
ax.set_ylim(window['ymin'], window['ymax'])
ax.set_xscale('log')
ax.set_xlim(window['xmax'], window['xmin'])
ax.grid()
ax.set_xlabel('wavelength ($km$)')
ax.set_ylabel(r'variance spectrum ($' + unit + '$)')
# plot slopes
# we take the second wavenumber (of first spectrum) as intercept, because
# it is often better fitted with spectrum than the first one
x_intercept = spectra[0].wavelengths[1]
y_intercept = spectra[0].variances[1]
i = 0
for slope in slopes:
# a slope is defined by y = A * k**-s and we plot it with
# two points y1, y2
try:
label = slope['label']
except KeyError:
# label = str(Fraction(slope['exp']).limit_denominator(10))
label = str(slope['exp'])
# because we plot w/r to wavelength, as opposed to wavenumber
s = -slope['exp']
A = y_intercept * x_intercept ** (-s) * slope['offset']
y1 = A * x1 ** s
y2 = A * x2 ** s
ax.plot([x1, x2], [y1, y2], color='0.7',
linestyle=linestyles[i % len(linestyles)],
label=r'$k^{' + label + '}$')
i += 1
# plot spectra
i = 0
for s in spectra:
ax.plot(s.wavelengths, s.variances, color=colors[i % len(colors)],
linestyle=linestyles[i // len(colors)], label=s.name)
i += 1
# legend
legend = ax.legend(loc='lower left', shadow=True)
for label in legend.get_texts():
label.set_fontsize('medium')
return (fig, ax)