#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class that handle a Horizontal 2D field.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import six
import copy
import numpy
import sys
import footprints
from footprints import FPDict, FPList, proxy as fpx
from bronx.graphics.axes import set_figax
from bronx.syntax.arrays import stretch_array
from epygram import epygramError, config
from epygram.util import (write_formatted, Angle,
degrees_nearest_mod,
as_numpy_array,
moveaxis)
from epygram.base import Field, FieldSet, FieldValidity, FieldValidityList, Resource
from epygram.geometries import (Geometry, SpectralGeometry, VGeometry, ProjectedGeometry,
RectangularGridGeometry, RegLLGeometry, UnstructuredGeometry)
[docs]class _D3CommonField(Field):
"""
3-Dimensions common field class.
"""
_collector = ('field',)
_abstract = True
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D']),
info="Type of Field geometry.")
)
)
@property
def spectral(self):
"""Returns True if the field is spectral."""
return self.spectral_geometry is not None and self.spectral_geometry != '__unknown__'
###########################
# ABOUT VERTICAL GEOMETRY #
###########################
[docs] def use_field_as_vcoord(self, field, force_kind=None, validity_check=True):
"""
Use values from the field as vertical coordinates.
One example of use: field1 is a temperature field
extracted from a resource on hybrid levels whereas
field2 is the pressure field expressed on the same
grid. field1.use_field_as_vcoord(field2) transforms
the vertical coordinate of field1 to obtain a field
on pressure levels.
:param field: must be a field on the same geometry and
validity as self that we will use to replace
the vertical coordinate. 'generic' fid must
allow to recognize the vertical coordinate
type (except if force_kind is set)
:param force_kind: if not None, is used as the vertical
coordinate kind instead of trying to
guess it from the fid
:param validity_check: if True (default) checks if validity
of current object and of field
parameter are the same
"""
if self.geometry != field.geometry:
raise epygramError("geometries must be identical")
if validity_check and self.validity != field.validity:
raise epygramError("validities must be identical, " + \
"or validity_check must be False")
if field.spectral:
raise epygramError("field must not be spectral")
self.geometry.vcoordinate = field.as_vcoordinate(force_kind)
[docs] def as_vcoordinate(self, force_kind=None):
"""
Returns a VGeometry build from the values of field.
Field must have a 'generic' fid allowing to
recognize the vertical coordinate type (except if
force_kind is set)
:param force_kind: if not None, is used as the vertical
coordinate kind instead of trying to
guess it from the fid
"""
if force_kind is None:
if 'generic' not in self.fid:
raise epygramError("fid must contain a 'generic' key or force_kind must be set")
for k in ['discipline', 'parameterCategory', 'parameterNumber']:
if k not in self.fid['generic']:
raise epygramError("fid['generic'] must contain the " + k + " key.")
fid = (self.fid['generic']['discipline'],
self.fid['generic']['parameterCategory'],
self.fid['generic']['parameterNumber'])
if fid == (0, 3, 0):
vcoord = 100
elif fid == (0, 3, 6):
vcoord = 102
else:
raise NotImplementedError("Do not know which kind of vertical coordinate fid corresponds to")
else:
vcoord = force_kind
levels = self.getdata().copy()
if vcoord == 100:
levels = levels / 100.
if len(self.validity) == 1:
# data is an array z, y, x with z being optional
if len(self.geometry.vcoordinate.levels) == 1:
# in case z dimension does not exist in data
levels = [levels]
else:
# data is an array t, z, y, x with z being optional
if len(self.geometry.vcoordinate.levels) == 1:
# in case z dimension does not exist in data
levels = [levels]
else:
# if z dimension exist, z and t dimensions must be exchanged
levels = levels.swapaxes(0, 1)
kwargs_vcoord = {'typeoffirstfixedsurface': vcoord,
'position_on_grid': self.geometry.vcoordinate.position_on_grid,
'levels': list(levels)
}
return VGeometry(**kwargs_vcoord)
##############
# ABOUT DATA #
##############
[docs] def getvalue_ll(self, lon=None, lat=None,
level=None,
k=None,
validity=None,
interpolation='nearest',
neighborinfo=False,
one=True,
external_distance=None):
"""
Returns the value of the field on point of coordinates
(*lon, lat, level*).
*lon* and *lat* may be list/numpy.array.
:param interpolation: if:\n
- 'nearest' (default), returns the value of the
nearest neighboring gridpoint;
- 'linear', computes and returns the field value
with bilinear or linear spline interpolation;
- 'cubic', computes and returns the field value
with cubic spline interpolation;
:param level: is the True level not the index of the level. Depending on
the vertical coordinate, it could be expressed in Pa, m.
:param k: is the index of the level.
:param validity: is a FieldValidity or a FieldValidityList instance
:param neighborinfo: if set to **True**, returns a tuple
*(value, (lon, lat))*, with *(lon, lat)* being
the actual coordinates of the neighboring gridpoint
(only for *interpolation == 'nearest'*).
:param one: if False and len(lon) is 1, returns [value] instead of value.
:param external_distance: can be a dict containing the target point value
and an external field on the same grid as self, to which the distance
is computed within the 4 horizontally nearest points; e.g.
{'target_value':4810, 'external_field':an_H2DField_with_same_geometry}.
If so, the nearest point is selected with
distance = abs(target_value - external_field.data)
When linear interpolation method is choosen, if the grid is rectangular,
the interpolation is a bilinear one on the model grid, otherwise it
is a spline interpolation on the lat/lon domain.
Warning: for interpolation on Gauss geometries, requires the
:mod:`pyproj` module.
"""
if self.spectral:
raise epygramError("field must be gridpoint to get value of a" +
" lon/lat point.")
if len(numpy.array(self.geometry.vcoordinate.levels).shape) == 1 and \
len(self.geometry.vcoordinate.levels) != len(set(self.geometry.vcoordinate.levels)):
raise epygramError('Some levels are represented twice in levels list.')
# We look for indexes for time coordinate (no interpolation)
if validity is None:
if len(self.validity) > 1:
raise epygramError("*validity* is mandatory when there are several validities")
t = 0
else:
if isinstance(validity, FieldValidity):
validity = FieldValidityList(validity)
t = [[ind for ind , val in enumerate(self.validity) if val == v][0] for v in validity]
t = as_numpy_array(t).flatten()
# We look for indexes for vertical coordinate (no interpolation)
if k is not None and level is not None:
raise epygramError("*level* and *k* cannot be different from None together")
elif k is level is None:
if self.geometry.datashape['k']:
raise epygramError("*level* or *k* is mandatory when field has a vertical coordinate")
k = 0
elif k is None:
# level is not None
k = [self.geometry.vcoordinate.levels.index(l) for l in as_numpy_array(level)]
k = as_numpy_array(k)
if lon is None or lat is None:
raise epygramError("*lon* and *lat* are mandatory")
lon, lat = as_numpy_array(lon).flatten(), as_numpy_array(lat).flatten()
if len(lon) != len(lat):
raise epygramError("*lon* and *lat* must have the same length")
sizes = set([len(x) for x in [lon, lat, k, t]])
if len(sizes) > 2 or (len(sizes) == 2 and 1 not in sizes):
raise epygramError("each of lon, lat, k/level and validity must be scalar or have the same length as the others")
if interpolation == 'linear':
if isinstance(self.geometry, RectangularGridGeometry):
method = 'bilinear'
else:
method = 'linear_spline'
else:
method = interpolation
if method == 'nearest':
(ri, rj) = moveaxis(numpy.array(self.geometry.nearest_points(lon, lat, {'n':'1'},
external_distance=external_distance)), 0, -1)
value = self.getvalue_ij(ri, rj, k, t, one=one)
if neighborinfo:
(lon, lat) = self.geometry.ij2ll(ri, rj)
if numpy.shape(lon) in ((1,), ()):
lon = float(lon)
lat = float(lat)
value = (value, (lon, lat))
elif method in ('linear_spline', 'cubic', 'bilinear'):
lon = lon if len(lon) == max(sizes) else lon.repeat(max(sizes))
lat = lat if len(lat) == max(sizes) else lat.repeat(max(sizes))
k = k if len(k) == max(sizes) else k.repeat(max(sizes))
t = t if len(t) == max(sizes) else t.repeat(max(sizes))
value = numpy.zeros(max(sizes))
interp_points = self.geometry.nearest_points(lon, lat,
{'linear':{'n':'2*2'},
'linear_spline':{'n':'2*2'},
'bilinear':{'n':'2*2'},
'cubic':{'n':'4*4'}}[interpolation],
squeeze=False)
# depack
all_i = interp_points[:, :, 0]
all_j = interp_points[:, :, 1]
all_k = numpy.repeat(k[:, numpy.newaxis], interp_points.shape[1], axis=1)
all_t = numpy.repeat(t[:, numpy.newaxis], interp_points.shape[1], axis=1)
# get values and lons/lats
values_at_interp_points = self.getvalue_ij(all_i.flatten(),
all_j.flatten(),
all_k.flatten(),
all_t.flatten()).reshape(all_i.shape)
if method in ('linear_spline', 'cubic'):
from scipy.interpolate import interp1d, interp2d
all_lons, all_lats = self.geometry.ij2ll(all_i.flatten(), all_j.flatten())
all_lons = all_lons.reshape(all_i.shape)
all_lats = all_lats.reshape(all_i.shape)
for n in range(max(sizes)):
if self.geometry.name == 'academic' and \
1 in (self.geometry.dimensions['X'], self.geometry.dimensions['Y'] == 1):
if self.geometry.dimensions['X'] == 1:
f = interp1d(all_lats[n], values_at_interp_points[n], kind=interpolation)
value[n] = f(lat[n])
else:
f = interp1d(all_lons[n], values_at_interp_points[n], kind=interpolation)
value[n] = f(lon[n])
else:
f = interp2d(all_lons[n], all_lats[n], values_at_interp_points[n], kind=interpolation)
value[n] = f(as_numpy_array(lon)[n], as_numpy_array(lat)[n])
elif method == 'bilinear':
def simple_inter(x1, q1, x2, q2, x):
"""Simple linear interpolant"""
return (x2 - x) / (x2 - x1) * q1 + (x - x1) / (x2 - x1) * q2
if 'gauss' in self.geometry.name:
test = numpy.all(all_j[:, 0] == all_j[:, 1]) and numpy.all(all_j[:, 2] == all_j[:, 3])
if not test:
raise RuntimeError("Points are not in the awaited order...")
lonrs, latrs = self.geometry._rotate_stretch(lon, lat) #waited point
lonrs_near, latrs_near = self.geometry._rotate_stretch(*self.geometry.ij2ll(all_i.flatten(), all_j.flatten()))
lonrs_near, latrs_near = lonrs_near.reshape(all_i.shape), latrs_near.reshape(all_i.shape)
value = simple_inter(latrs_near[:, 0], simple_inter(lonrs_near[:, 0], values_at_interp_points[:, 0],
lonrs_near[:, 1], values_at_interp_points[:, 1], lonrs),
latrs_near[:, 2], simple_inter(lonrs_near[:, 2], values_at_interp_points[:, 2],
lonrs_near[:, 3], values_at_interp_points[:, 3], lonrs),
latrs)
else:
square = numpy.all(all_i[:, 0] == all_i[:, 1]) and numpy.all(all_i[:, 2] == all_i[:, 3]) and \
numpy.all(all_j[:, 0] == all_j[:, 2]) and numpy.all(all_j[:, 1] == all_j[:, 3])
if not square:
raise NotImplementedError("Bilinear interpolation on non gaussian, irregular grid")
# Position of wanted point
wanted_i, wanted_j = self.geometry.ll2ij(lon, lat)
value = simple_inter(all_j[:, 0], simple_inter(all_i[:, 0], values_at_interp_points[:, 0],
all_i[:, 2], values_at_interp_points[:, 2], wanted_i),
all_j[:, 1], simple_inter(all_i[:, 1], values_at_interp_points[:, 1],
all_i[:, 3], values_at_interp_points[:, 3], wanted_i),
wanted_j)
else:
raise NotImplementedError('*interpolation*=' + interpolation)
if one:
try:
value = float(value)
except (ValueError, TypeError):
pass
return copy.copy(value)
[docs] def as_lists(self, order='C', subzone=None):
"""
Export values as a dict of lists (in fact numpy arrays).
:param order: whether to flatten arrays in 'C' (row-major) or
'F' (Fortran, column-major) order.
:param subzone: defines the LAM subzone to be included, in LAM case,
among: 'C', 'CI'.
"""
if self.spectral:
raise epygramError("as_lists method needs a grid-point field, not a spectral one.")
lons4d, lats4d = self.geometry.get_lonlat_grid(d4=True, nb_validities=len(self.validity), subzone=subzone)
levels4d = self.geometry.get_levels(d4=True, nb_validities=len(self.validity), subzone=subzone)
data4d = self.getdata(d4=True, subzone=subzone)
dates4d = []
times4d = []
for i, t in enumerate(self.validity):
dates4d += [t.get().year * 10000 + t.get().month * 100 + t.get().day] * levels4d[i].size
times4d += [t.get().hour * 100 + t.get().minute] * levels4d[i].size
dates4d = numpy.array(dates4d).reshape(data4d.shape).flatten(order=order)
times4d = numpy.array(times4d).reshape(data4d.shape).flatten(order=order)
result = dict(values=data4d.flatten(order=order),
latitudes=lats4d.flatten(order=order),
longitudes=lons4d.flatten(order=order),
levels=levels4d.flatten(order=order),
dates=dates4d.flatten(order=order),
times=times4d.flatten(order=order))
return result
[docs] def as_dicts(self, subzone=None):
"""
Export values as a list of dicts.
:param subzone: defines the LAM subzone to be included, in LAM case,
among: 'C', 'CI'.
"""
if self.spectral:
raise epygramError("as_dicts method needs a grid-point field, not a spectral one.")
lons, lats = self.geometry.get_lonlat_grid(subzone=subzone)
data4d = self.getdata(d4=True, subzone=subzone)
levels4d = self.geometry.get_levels(d4=True, nb_validities=len(self.validity), subzone=subzone)
result = []
for t in range(data4d.shape[0]):
validity = self.validity[t]
date = validity.get().year * 10000 + validity.get().month * 100 + validity.get().day
time = validity.get().hour * 100 + validity.get().minute
for k in range(data4d.shape[1]):
for j in range(data4d.shape[2]):
for i in range(data4d.shape[3]):
result.append(dict(value=data4d[t, k, j, i],
date=date,
time=time,
latitude=lats[j, i],
longitude=lons[j, i],
level=levels4d[t, k, j, i]))
return result
[docs] def as_points(self, subzone=None):
"""
Export values as a fieldset of points.
:param subzone: defines the LAM subzone to be included, in LAM case,
among: 'C', 'CI'.
"""
if self.spectral:
raise epygramError("as_points method needs a grid-point field, not a spectral one.")
field_builder = fpx.field
lons, lats = self.geometry.get_lonlat_grid(subzone=subzone)
data4d = self.getdata(d4=True, subzone=subzone)
levels4d = self.geometry.get_levels(d4=True, nb_validities=len(self.validity), subzone=subzone)
result = FieldSet()
vcoordinate = self.geometry.vcoordinate.deepcopy()
for t in range(data4d.shape[0]):
validity = self.validity[t]
for k in range(data4d.shape[1]):
for j in range(data4d.shape[2]):
for i in range(data4d.shape[3]):
vcoordinate.levels = [levels4d[t, k, j, i]]
geometry = UnstructuredGeometry(name='unstructured',
dimensions={'X':1, 'Y':1},
vcoordinate=vcoordinate.deepcopy(),
grid={'longitudes':[lons[j, i]],
'latitudes':[lats[j, i]]},
position_on_horizontal_grid='center'
)
pointfield = field_builder(structure='Point',
fid=dict(copy.deepcopy(self.fid)),
geometry=geometry,
validity=validity.copy())
pointfield.setdata(data4d[t, k, j, i])
result.append(pointfield)
return result
[docs] def as_profiles(self, subzone=None):
"""
Export values as a fieldset of profiles.
:param subzone: defines the LAM subzone to be included, in LAM case,
among: 'C', 'CI'.
"""
if self.spectral:
raise epygramError("as_profiles method needs a grid-point field, not a spectral one.")
field_builder = fpx.field
lons4d, lats4d = self.geometry.get_lonlat_grid(subzone=subzone, d4=True, nb_validities=1)
data4d = self.getdata(d4=True, subzone=subzone)
levels4d = self.geometry.get_levels(d4=True, nb_validities=len(self.validity), subzone=subzone)
result = FieldSet()
vcoordinate = self.geometry.vcoordinate.deepcopy()
for j in range(data4d.shape[2]):
for i in range(data4d.shape[3]):
if all([numpy.all(levels4d[0, :, j, i] == levels4d[t, :, j, i]) for t in range(len(self.validity))]):
vcoordinate.levels = list(levels4d[0, :, j, i])
else:
vcoordinate.levels = list(levels4d[:, :, j, i].swapaxes(0, 1))
geometry = UnstructuredGeometry(name='unstructured',
dimensions={'X':1, 'Y':1},
vcoordinate=vcoordinate.deepcopy(),
grid={'longitudes':[lons4d[0, 0, j, i].tolist()],
'latitudes':[lats4d[0, 0, j, i].tolist()],
},
position_on_horizontal_grid='center',
)
profilefield = field_builder(structure='V1D',
fid=dict(copy.deepcopy(self.fid)),
geometry=geometry,
validity=self.validity.copy())
profilefield.setdata(data4d[:, :, j, i])
result.append(profilefield)
return result
[docs] def extract_subdomain(self, geometry,
interpolation='nearest',
external_distance=None,
getdata=True):
"""
Extracts a subdomain from a field, given a new geometry.
:param geometry: defines the geometry on which extract data
:param interpolation: defines the interpolation function used to compute
the profile at requested lon/lat from the fields
grid:\n
- if 'nearest' (default), extracts profile at the horizontal nearest
neighboring gridpoint;
- if 'linear', computes profile with horizontal linear interpolation;
- if 'cubic', computes profile with horizontal cubic interpolation.
:param external_distance: can be a dict containing the target point value
and an external field on the same grid as self, to which the distance
is computed within the 4 horizontally nearest points; e.g.
{'target_value':4810, 'external_field':an_H2DField_with_same_geometry}.
If so, the nearest point is selected with
distance = abs(target_value - external_field.data)
:param getdata: if False returns a field without data
"""
# build subdomain fid
subdomainfid = {key:(FPDict(value)
if isinstance(value, dict)
else value)
for (key, value) in self.fid.items()}
# build vertical geometry
# k_index: ordered list of the level indexes of self used to build the new field
kwargs_vcoord = {'typeoffirstfixedsurface': self.geometry.vcoordinate.typeoffirstfixedsurface,
'position_on_grid': self.geometry.vcoordinate.position_on_grid}
if self.geometry.vcoordinate.typeoffirstfixedsurface == 119:
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
kwargs_vcoord['levels'] = copy.copy(self.geometry.vcoordinate.levels)
k_index = range(len(kwargs_vcoord['levels']))
elif self.geometry.vcoordinate.typeoffirstfixedsurface == 118:
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
kwargs_vcoord['levels'] = copy.copy(self.geometry.vcoordinate.levels)
k_index = list(range(len(kwargs_vcoord['levels'])))
elif self.geometry.vcoordinate.typeoffirstfixedsurface in [100, 102, 103, 109, 1, 106, 255, 160, 200]:
kwargs_vcoord['levels'] = list(copy.deepcopy(self.geometry.vcoordinate.levels))
k_index = range(len(kwargs_vcoord['levels']))
else:
raise NotImplementedError("type of first surface level: " + str(self.geometry.vcoordinate.typeoffirstfixedsurface))
if geometry.vcoordinate.typeoffirstfixedsurface not in [255, kwargs_vcoord['typeoffirstfixedsurface']]:
raise epygramError("extract_subdomain cannot change vertical coordinate.")
if geometry.vcoordinate.position_on_grid not in [None, '__unknown__', kwargs_vcoord['position_on_grid']]:
raise epygramError("extract_subdomain cannot change position on vertical grid.")
if geometry.vcoordinate.grid != {} and geometry.vcoordinate.grid != kwargs_vcoord['grid']:
raise epygramError("extract_subdomain cannot change vertical grid")
if geometry.vcoordinate.levels != []:
k_index = []
for level in geometry.vcoordinate.levels:
if level not in kwargs_vcoord['levels']:
raise epygramError("extract_subdomain cannot do vertical interpolations.")
k_index.append(self.geometry.vcoordinate.levels.index(level))
kwargs_vcoord['levels'] = geometry.vcoordinate.levels
vcoordinate = VGeometry(**kwargs_vcoord)
# build geometry
newgeometry = geometry.deepcopy()
newgeometry.vcoordinate = vcoordinate
newgeometry.position_on_horizontal_grid = 'center'
if geometry.position_on_horizontal_grid not in [None, '__unknown__', newgeometry.position_on_horizontal_grid]:
raise epygramError("extract_subdomain cannot deal with position_on_horizontal_grid other than 'center'")
if any([isinstance(level, numpy.ndarray) for level in newgeometry.vcoordinate.levels]):
# level value is not constant on the domain, we must extract a subdomain.
# We build a new field with same structure/geometry/validity as self
# and we fill its data with the level values. We replace levels by index
# in the geometry to stop recursion and use the extract_subdomain method
# to get the level values on the new geometry levels.
temp_field = self.geometry.vcoord_as_field(self.geometry.vcoordinate.typeoffirstfixedsurface,
self.validity)
temp_geom = geometry.deepcopy()
del temp_geom.vcoordinate.levels[:]
temp_geom.vcoordinate.levels.extend(k_index) # We extract only the requested levels
extracted = temp_field.extract_subdomain(temp_geom,
interpolation=interpolation,
external_distance=external_distance,
getdata=True).getdata(d4=True)
extracted = extracted.swapaxes(0, 1) # z first, then validity for levels (opposite of fields)
if len(self.validity) > 1:
# We keep all dims when validity is not unique
extracted = list(extracted)
else:
# We suppress the validity dimension
extracted = list(extracted[:, 0, ...])
del newgeometry.vcoordinate.levels[:]
newgeometry.vcoordinate.levels.extend(extracted)
del temp_field, temp_geom
# location & interpolation
lons, lats = newgeometry.get_lonlat_grid()
if len(lons.shape) > 1:
lons = stretch_array(lons)
lats = stretch_array(lats)
if not numpy.all(self.geometry.point_is_inside_domain_ll(lons, lats)):
for (lon, lat) in numpy.nditer([lons, lats]):
if not self.geometry.point_is_inside_domain_ll(lon, lat):
raise ValueError("point (" + str(lon) + ", " +
str(lat) + ") is out of field domain.")
comment = None
if interpolation == 'nearest':
if lons.size == 1:
true_loc = self.geometry.ij2ll(*self.geometry.nearest_points(lons, lats,
{'n':'1'},
external_distance=external_distance))
distance = self.geometry.distance((float(lons), float(lats)),
(float(true_loc[0]),
float(true_loc[1])))
az = self.geometry.azimuth((float(lons), float(lats)),
(float(true_loc[0]),
float(true_loc[1])))
if -22.5 < az <= 22.5:
direction = 'N'
elif -77.5 < az <= -22.5:
direction = 'NW'
elif -112.5 < az <= -77.5:
direction = 'W'
elif -157.5 < az <= -112.5:
direction = 'SW'
elif -180.5 <= az <= -157.5 or 157.5 < az <= 180.:
direction = 'S'
elif 22.5 < az <= 77.5:
direction = 'NE'
elif 77.5 < az <= 112.5:
direction = 'E'
elif 112.5 < az <= 157.5:
direction = 'SE'
gridpointstr = "(" + \
'{:.{precision}{type}}'.format(float(true_loc[0]),
type='F',
precision=4) + \
", " + \
'{:.{precision}{type}}'.format(float(true_loc[1]),
type='F',
precision=4) + \
")"
comment = "Profile @ " + str(int(distance)) + "m " + \
direction + " from " + str((float(lons), float(lats))) + \
"\n" + "( = nearest gridpoint: " + gridpointstr + ")"
elif interpolation in ('linear', 'cubic', 'bilinear'):
if interpolation == 'linear':
interpstr = 'linearly'
elif interpolation == 'cubic':
interpstr = 'cubically'
elif interpolation == 'bilinear':
interpstr = 'bilinearly'
if lons.size == 1:
comment = "Profile " + interpstr + " interpolated @ " + \
str((float(lons), float(lats)))
else:
raise NotImplementedError(interpolation + " interpolation.")
# Values
if getdata:
shp = newgeometry.get_datashape(dimT=len(self.validity), d4=True)
data = numpy.ndarray(shp)
for t in range(len(self.validity)):
#This is the old version, to delete later
#for k in range(len(newgeometry.vcoordinate.levels)):
# # level = newgeometry.vcoordinate.levels[k]
# extracted = self.getvalue_ll(lons, lats,
# # level=level, # equivalent to k=k_index[k] except that the k option
# k=k_index[k], # still works when level is an array
# validity=self.validity[t],
# interpolation=interpolation,
# external_distance=external_distance,
# one=False)
# data[t, k, :, :] = newgeometry.reshape_data(extracted)
data[t, ...] = self.getvalue_ll(as_numpy_array(lons)[numpy.newaxis, :].repeat(len(k_index), axis=0).flatten(),
as_numpy_array(lats)[numpy.newaxis, :].repeat(len(k_index), axis=0).flatten(),
k=as_numpy_array(k_index).repeat(lons.size),
validity=self.validity[t],
interpolation=interpolation,
external_distance=external_distance,
one=False).reshape(shp[1:])
# Field
newfield = fpx.field(fid=FPDict(subdomainfid),
structure=newgeometry.structure,
geometry=newgeometry,
validity=self.validity.deepcopy(),
processtype=self.processtype,
comment=comment)
if getdata:
newfield.setdata(data)
return newfield
#This method is intended to replace the extract_zoom one
#but is still longer at execution
#def _extract_zoom_with_subdo(self, zoom, extra_10th=False):
# """
# Extract an unstructured field with the gridpoints contained in *zoom*.
#
# :param zoom: a dict(lonmin=, lonmax=, latmin=, latmax=).
# :param extra_10th: if True, add 1/10th of the X/Y extension of the zoom
# (regular_lonlat grid case only).
# """
# assert not self.spectral, \
# "spectral field: convert to gridpoint beforehand"
#
# zoom_geom = self.geometry.make_zoom_geometry(zoom, extra_10th)
# if any([isinstance(level, numpy.ndarray) for level in self.geometry.vcoordinate.levels]):
# del zoom_geom.vcoordinate.levels[:]
# zoom_field = self.extract_subdomain(zoom_geom, interpolation='nearest')
#
# return zoom_field
[docs] def shave(self, minval=None, maxval=None):
"""
Shave data: cut values greater (resp. lower) than **maxval** to
**maxval** (resp. **minval**).
:param maxval: upper threshold
:param minval: lower threshold
"""
data = self.getdata()
if isinstance(data, numpy.ma.masked_array):
if maxval is not None:
data.data[data.data > maxval] = maxval
if minval is not None:
data.data[data.data < minval] = minval
else:
if maxval is not None:
data[data > maxval] = maxval
if minval is not None:
data[data < minval] = minval
self.data = data
[docs] def resample(self, target_geometry,
weighting='nearest',
subzone=None,
neighbour_info=None,
# neighbouring options
radius_of_influence=None,
neighbours=8,
epsilon=0,
reduce_data=True,
nprocs=1,
segments=None,
# resampling options
fill_value=None,
sigma=None,
with_uncert=False):
"""
Resample data (interpolate) on a **target geometry**, using pyresample.
:param weighting: among ['nearest',
'gauss' (in which case sigma can be specified,
cf. sigma option below),
lambda r:1/r**2 (or other function, r in m)]
:param subzone: restrain source field to LAMzone in LAM geometry case
:param neighbour_info: - if True: skips the resampling, only compute
neighbours and return this
information as a tuple;
- if given as a tuple, skips the computation of
neighbours, taking those information
given in argument (from a former call with
neighbour_info=True)
Options from pyresample, neighbouring:
:param radius_of_influence: Cut off distance in meters, default value
computed from geometry
:param neighbours: The number of neigbours to consider for each grid
point
:param epsilon: Allowed uncertainty in meters. Increasing uncertainty
reduces execution time
:param reduce_data: Perform initial coarse reduction of source dataset
in order to reduce execution time
:param nprocs: Number of processor cores to be used
:param segments: Number of segments to use when resampling.
If set to None an estimate will be calculated
Options from pyresample, resampling:
:param fill_value: Set undetermined pixels to this value.
If fill_value is None a masked array is returned
with undetermined pixels masked
:param sigma: sigma (in m) to use for the gauss weighting
w = exp(-dist^2/sigma^2)
Default is twice the resolution.
:param with_uncert: Calculate uncertainty estimates (returned as side
fields)
"""
assert not self.spectral, "field must be gridpoint"
try:
from pyresample.geometry import GridDefinition, SwathDefinition
from pyresample.kd_tree import (get_neighbour_info,
get_sample_from_neighbour_info)
except ImportError:
raise ImportError("""pyresample is not installed.
You can install it locally using:\n
'pip install --user pyresample'""")
# PART 0: computation of parameters for PARTS 1 & 2
lons, lats = target_geometry.get_lonlat_grid(subzone=subzone, d4=True, nb_validities=1,
force_longitudes=']-180,180]')
lons = lons[0, 0, :, :]
lats = lats[0, 0, :, :]
if isinstance(lons, numpy.ma.MaskedArray) and lons.mask.any():
reduce_data = False # functionality does not work properly
target_geo = GridDefinition(lons, lats)
def _resolution():
if 'gauss' in self.geometry.name:
# gauss grid case
resolution = self.geometry.resolution_j(self.geometry.dimensions['lat_number'] - 1)
elif self.geometry.name in ('regular_lonlat', 'unstructured'):
resolution = self.geometry.resolution_ij(self.geometry.dimensions['X'] / 2,
self.geometry.dimensions['Y'] / 2)
else:
resolution = (self.geometry.grid.get('X_resolution') +
self.geometry.grid.get('Y_resolution')) / 2.
return resolution
# PART 1: computation of neighbours
if neighbour_info in (True, None, False):
lons, lats = self.geometry.get_lonlat_grid(subzone=subzone, d4=True, nb_validities=1,
force_longitudes=']-180,180]')
lons = lons[0,0,:,:]
lats = lats[0,0,:,:]
if isinstance(lons, numpy.ma.MaskedArray) and lons.mask.any():
reduce_data = False # functionality does not work properly
source_geo = GridDefinition(lons, lats)
if radius_of_influence is None:
radius_of_influence = 4. * _resolution()
if weighting == 'nearest':
neighbours = 1
(valid_input_index,
valid_output_index,
index_array,
distance_array) = get_neighbour_info(source_geo,
target_geo,
radius_of_influence=radius_of_influence,
neighbours=neighbours,
epsilon=epsilon,
reduce_data=reduce_data,
nprocs=nprocs,
segments=segments)
if neighbour_info is True:
return (valid_input_index,
valid_output_index,
index_array,
distance_array)
else: # neighbour_info is given: tuple of information about neighbours
assert len(neighbour_info) == 4
(valid_input_index,
valid_output_index,
index_array,
distance_array) = neighbour_info
# PART 2: computation of resampling
source_data = self.getdata(d4=True, subzone=subzone)
resampled_data = numpy.ma.zeros((source_data.shape[0], # t
source_data.shape[1], # z
target_geo.shape[0], # y
target_geo.shape[1])) # x
if with_uncert:
stddev = numpy.ma.zeros((source_data.shape[0], # t
source_data.shape[1], # z
target_geo.shape[0], # y
target_geo.shape[1])) # x
counts = numpy.ma.zeros((source_data.shape[0], # t
source_data.shape[1], # z
target_geo.shape[0], # y
target_geo.shape[1])) # x
if callable(weighting): # custom: weighting function
weight_funcs = weighting
weighting = 'custom'
elif weighting == 'nearest':
weighting = 'nn'
weight_funcs = None
elif weighting == 'gauss':
def gauss(s):
return lambda r: numpy.exp(-r ** 2 / s ** 2)
weighting = 'custom'
if sigma is None:
sigma = 2. * _resolution()
else:
sigma = float(sigma)
weight_funcs = gauss(sigma)
else:
raise epygramError("unknown weighting='" + str(weighting))
for t in range(source_data.shape[0]):
for z in range(source_data.shape[1]):
rdata = get_sample_from_neighbour_info(weighting,
target_geo.shape,
source_data[t, z, :, :],
valid_input_index,
valid_output_index,
index_array,
distance_array,
weight_funcs=weight_funcs,
fill_value=fill_value,
with_uncert=with_uncert)
if with_uncert:
resampled_data[t, z, :, :] = rdata[0][:, :]
stddev[t, z, :, :] = rdata[1][:, :]
counts[t, z, :, :] = rdata[2][:, :]
else:
resampled_data[t, z, :, :] = rdata[:, :]
# build final field
field_kwargs = copy.deepcopy(self._attributes)
field_kwargs['geometry'] = target_geometry
newfield = fpx.field(**field_kwargs)
if not resampled_data.mask.any():
resampled_data = resampled_data.data
newfield.setdata(resampled_data)
if with_uncert:
stddev_field = newfield.deepcopy()
stddev_field.setdata(stddev)
stddev_field.comment += ':stddev_from_resampling'
counts_field = newfield.deepcopy()
counts_field.setdata(counts)
stddev_field.comment += ':number_of_points_from_resampling'
return newfield, stddev_field, counts_field
else:
return newfield
[docs] def resample_on_regularll(self, borders, resolution_in_degrees, **kwargs):
"""
Resample data (interpolate) on a target geometry defined as a
regular lonlat grid of given resolution.
Wrapper to resample() method, building target geometry.
:param dict borders: dict(lonmin=, latmin=, lonmax=, latmax=)
:param resolution_in_degrees: resolution of the lonlat grid in degrees.
Supposed to be a divisor of lonmax-lonmin
and latmax-latmin.
Other kwargs to be passed to resample() method.
"""
DX = borders['lonmax'] - borders['lonmin']
if DX < 0.:
DX += 360.
DY = borders['latmax'] - borders['latmin']
X = int(DX / resolution_in_degrees) + 1
Y = int(DY / resolution_in_degrees) + 1
grid = {'input_position':(0, 0),
'input_lon':Angle(borders['lonmin'], 'degrees'),
'input_lat':Angle(borders['latmin'], 'degrees'),
'X_resolution':Angle(resolution_in_degrees, 'degrees'),
'Y_resolution':Angle(resolution_in_degrees, 'degrees')}
target_geometry = RegLLGeometry(name='regular_lonlat',
vcoordinate=self.geometry.vcoordinate.deepcopy(),
dimensions={'X':X, 'Y':Y},
grid=grid,
position_on_horizontal_grid='center',
geoid=self.geometry.geoid)
return self.resample(target_geometry, **kwargs)
[docs] def extend(self, another_field_with_time_dimension):
"""
Extend the field with regard to time dimension with the field given as
argument.
Be careful no check is done for consistency between the two fields
geometry (except that dimensions match) nor their validities.
"""
another = another_field_with_time_dimension
d1 = self.getdata(d4=True)
d2 = another.getdata(d4=True)
d = numpy.concatenate([d1, d2], axis=0)
self.validity.extend(another.validity)
self.setdata(d)
[docs] def remove_level(self, k):
"""
Remove one level from the current field
:param k: index of the level to remove
"""
del self.geometry.vcoordinate.levels[k]
self.setdata(numpy.delete(self.getdata(d4=True), k, 1))
[docs] def decumulate(self, center=False):
"""
Decumulate cumulated fields (for a field with temporal dimension !).
:param center: - if False, values at t are mean values between t-1 and t,
except at t=0 where it remains untouched.
- if True, values at t are then (v[t-1, t] + v[t,t+1])/2.,
except for t=0 where it becomes v[0, 1]
and t=last where it remains the same.
"""
if len(self.validity) > 1:
data = self.getdata(d4=True)
data[1:, ...] = data[1:, ...] - data[0:-1, ...]
if center:
data[0, ...] = data[1, ...] # the actual mean between 0 and 1
if len(self.validity) > 2:
data[1:-1, ...] = (data[1:-1, ...] + data[2:, ...]) / 2.
self.setdata(data)
[docs] def time_smooth(self, length, window='center'):
"""
Smooth data in time.
:param length: time span to average on
:param window: replace data[t] by:\n
- data[t-length/2:t+length/2].mean() if 'center';
- data[t-length:t].mean() if 'left';
- data[t:t+length/2].mean() if 'right'.
"""
assert len(self.validity) > 1
data = copy.deepcopy(self.getdata(d4=True))
if window == 'center':
tinf = length // 2
tsup = length // 2
elif window == 'left':
tinf = length
tsup = 0
elif window == 'right':
tinf = 0
tsup = length
for t in range(len(self.validity)):
t9 = max(0, t - tinf)
t1 = min(len(self.validity), t + tsup)
data[t, ...] = self.getdata(d4=True)[t9:t1, ...].mean(axis=0)
self.setdata(data)
[docs] def time_reduce(self, reduce_function='mean'):
"""
Make a time reduction of field,
i.e. apply the requested **reduce_function** on the time dimension of
the field. Replace data (and validity) in place.
:param reduce_function: must be a numpy.ndarray method,
e.g. 'mean', 'sum', 'min', 'max', ...
"""
data = self.getdata(d4=True)
assert hasattr(data, reduce_function), 'unknown method:{} for numpy arrays'.format(reduce_function)
reduced = getattr(data, reduce_function)(axis=0)
validity = self.validity[-1].deepcopy()
validity.set(cumulativeduration=self.validity[-1].get() -
self.validity[0].get(),
statistical_process_on_duration=reduce_function)
self.validity = validity
self.setdata(reduced.squeeze())
###################
# PRE-APPLICATIVE #
###################
# (but useful and rather standard) !
# [so that, subject to continuation through updated versions,
# including suggestions/developments by users...]
[docs] def stats(self, subzone=None):
"""
Computes some basic statistics on the field, as a dict containing:
{'min', 'max', 'mean', 'std', 'quadmean', 'nonzero'}.
See each of these methods for details.
:param subzone: optional, among ('C', 'CI'), for LAM fields only, plots
the data resp. on the C or C+I zone.
Default is no subzone, i.e. the whole field.
"""
return {'min':self.min(subzone=subzone),
'max':self.max(subzone=subzone),
'mean':self.mean(subzone=subzone),
'std':self.std(subzone=subzone),
'quadmean':self.quadmean(subzone=subzone),
'nonzero':self.nonzero(subzone=subzone)}
[docs] def min(self, subzone=None):
"""Returns the minimum value of data."""
return super(_D3CommonField, self).min(subzone=subzone)
[docs] def max(self, subzone=None):
"""Returns the maximum value of data."""
return super(_D3CommonField, self).max(subzone=subzone)
[docs] def mean(self, subzone=None):
"""Returns the mean value of data."""
return super(_D3CommonField, self).mean(subzone=subzone)
[docs] def std(self, subzone=None):
"""Returns the standard deviation of data."""
return super(_D3CommonField, self).std(subzone=subzone)
[docs] def quadmean(self, subzone=None):
"""Returns the quadratic mean of data."""
return super(_D3CommonField, self).quadmean(subzone=subzone)
[docs] def nonzero(self, subzone=None):
"""
Returns the number of non-zero values (whose absolute
value > config.epsilon).
"""
return super(_D3CommonField, self).nonzero(subzone=subzone)
[docs] def sgmean(self):
"""Returns an average of the field, weighted by the map factor at each point."""
map_factor = self.geometry.map_factor_field() # Extraction of the map_factor field from geometry object
map_factor_data = map_factor.getdata()
nonmasked_value = map_factor_data.count(1) # Number of longitude points for each (pseudo)latitude circle
myfield_data = self.getdata()
myfield_data_norm = numpy.ma.divide(myfield_data, map_factor_data) # Global map factor normalisation
myfield_data_norm_sumlat = numpy.sum(myfield_data_norm, 1) # Sum over longitude for each latitude circle
myfield_data_norm_sumlat = numpy.ma.divide(myfield_data_norm_sumlat, nonmasked_value) # Normalisation by the number of longitude points for each latitude circle
Nb_lat = len(myfield_data_norm_sumlat) # Nb of latitude circles
myfield_data_sgmean = numpy.sum(myfield_data_norm_sumlat) / Nb_lat # Sum over latitude normalised by the number of latitude circles
return myfield_data_sgmean
[docs] def dctspectrum(self, subzone=None, level_index=None, validity_index=None):
"""
Returns the DCT spectrum of the field, as a
:class:`epygram.spectra.Spectrum` instance.
:param subzone: LAM zone on which to compute the DCT,
among (None, 'C', 'CI', 'CIE'). Defaults to 'C'.
:param level_index: the level index on which to compute the DCT
:param validity_index: the validity index on which to compute the DCT
"""
import epygram.spectra as esp
if level_index is None:
if self.geometry.datashape['k']:
raise epygramError("must provide *level_index* for a 3D field.")
else:
level_index = 0
elif level_index > 0 and not self.geometry.datashape['k']:
raise epygramError("invalid *level_index* with regard to vertical levels.")
if validity_index is None:
if len(self.validity) > 1:
raise epygramError("must provide *validity_index* for a time-dimensioned field.")
else:
validity_index = 0
elif validity_index > 0 and len(self.validity) == 1:
raise epygramError("invalid *validity_index* with regard to time dimension.")
if self.geometry.datashape['k']:
field2d = self.getlevel(k=level_index)
else:
field2d = self
if len(self.validity) > 1:
field2dt0 = field2d.getvalidity(validity_index)
else:
field2dt0 = field2d
if subzone is None and self.geometry.grid.get('LAMzone', None) is not None:
subzone = 'C'
variances = esp.dctspectrum(field2dt0.getdata(subzone=subzone))
spectrum = esp.Spectrum(variances[1:],
name=str(self.fid),
resolution=self.geometry.grid['X_resolution'] / 1000.,
mean2=variances[0])
return spectrum
[docs] def histogram(self,
subzone=None,
over=(None, None),
title=None,
get_n_bins_patches=False,
together_with=[],
mask_threshold=None,
center_hist_on_0=False,
minmax_in_title=True,
figsize=None,
**hist_kwargs):
"""
Build an histogram of the field data.
:param subzone: LAM zone on which to compute the DCT,
among (None, 'C', 'CI', 'CIE').
:param over: any existing figure and/or ax to be used for the
plot, given as a tuple (fig, ax), with None for
missing objects. *fig* is the frame of the
matplotlib figure, containing eventually several
subplots (axes); *ax* is the matplotlib axes on
which the drawing is done. When given (is not None),
these objects must be coherent, i.e. ax being one of
the fig axes.
:param title: title for the plot
:param get_n_bins_patches: if True, returns n, bins, patches
(cf. matplotlib hist()) in addition
:param together_with: another field or list of fields to plot on the
same histogram
:param mask_threshold: dict with min and/or max value(s) to mask outside.
:param center_hist_on_0: to center the histogram on 0.
:param minmax_in_title: if True and **range** is not None,
adds min and max values in title.
:param hist_kwargs: any keyword argument to be passed to
matplotlib's hist()
:param figsize: figure sizes in inches, e.g. (5, 8.5).
Default figsize is config.plotsizes.
"""
import matplotlib.pyplot as plt
# initializations
plt.rc('font', family='serif')
if figsize is None:
figsize = config.plotsizes
if self.spectral:
raise epygramError("please convert to gridpoint with sp2gp()" +
" method before plotting.")
mask_outside = {'min':-config.mask_outside,
'max':config.mask_outside}
if mask_threshold is not None:
mask_outside.update(mask_threshold)
# get data
data1d = numpy.ma.masked_outside(self.getdata(subzone=subzone),
mask_outside['min'],
mask_outside['max'])
data1d = [stretch_array(data1d)]
fig, ax = set_figax(*over, figsize=figsize)
if together_with != []:
if isinstance(together_with, D3Field):
together_with = [together_with]
data1d = data1d + [stretch_array(numpy.ma.masked_outside(f.getdata(subzone=subzone),
mask_outside['min'],
mask_outside['max']))
for f in together_with]
# plot params
if title is None:
if len(data1d) == 1:
ax.set_title("\n".join([str(self.fid[sorted(self.fid.keys())[0]]),
str(self.validity.get())]))
else:
ax.set_title(title)
if 'rwidth' not in hist_kwargs.keys():
hist_kwargs['rwidth'] = 0.9
if 'label' not in hist_kwargs.keys() and len(data1d) > 1:
hist_kwargs['label'] = [self.fid[sorted(self.fid.keys())[0]]] + \
[f.fid[sorted(f.fid.keys())[0]]
for f in together_with]
if hist_kwargs.get('range') is not None:
try:
m = float(hist_kwargs['range'][0])
except ValueError:
m = min([d.min() for d in data1d])
try:
M = float(hist_kwargs['range'][1])
except ValueError:
M = max([d.max() for d in data1d])
hist_kwargs['range'] = (m, M)
if minmax_in_title:
minmax_in_title = '(min: ' + \
'{: .{precision}{type}}'.format(min([d.min() for d in data1d]),
type='E', precision=3) + \
' // max: ' + \
'{: .{precision}{type}}'.format(max([d.max() for d in data1d]),
type='E', precision=3) + ')'
else:
minmax_in_title = ''
if hist_kwargs.get('range') is None:
if hist_kwargs.get('bins') is None or \
isinstance(hist_kwargs.get('bins'), int):
hist_kwargs['range'] = (min([d.min() for d in data1d]),
max([d.max() for d in data1d]))
if isinstance(hist_kwargs.get('bins'), list):
if hist_kwargs['bins'][0] == 'min':
hist_kwargs['bins'][0] = min([d.min() for d in data1d])
if hist_kwargs['bins'][-1] == 'min':
hist_kwargs['bins'][-1] = max([d.max() for d in data1d])
# build histogram
n, bins, patches = ax.hist(data1d, **hist_kwargs)
# finalize graphical options
if len(bins) > 8:
step = len(bins) // 7
ticks = [b for b in bins[::step]]
if bins[-1] != ticks[-1]:
ticks.append(bins[-1])
else:
ticks = bins
ax.get_xaxis().set_tick_params(which='both', direction='out')
ax.set_xticks(bins, minor=True)
ax.set_xticks(ticks)
ax.set_xticklabels(['{:.3E}'.format(t) for t in ticks],
rotation=27.5,
horizontalalignment='right')
if center_hist_on_0:
M = max(abs(ticks[0]), abs(ticks[-1]))
ax.set_xlim(-M, M)
ax.axvline(0., linestyle='-', color='k') # vertical lines
else:
ax.set_xlim(ticks[0], ticks[-1])
ax.set_ylabel('Number of gridpoints')
xlabel = 'Gridpoint values'
if minmax_in_title:
xlabel += '\n' + minmax_in_title
ax.set_xlabel(xlabel)
if 'label' in hist_kwargs.keys():
ax.legend()
ax.grid()
if get_n_bins_patches:
return fig, ax, n, bins, patches
else:
return fig, ax
[docs] def scatter_with(self, other,
fig=None,
ax=None,
figsize=None,
mask_outside=config.mask_outside,
subzone=None,
fidkey=None,
xlabel='__auto__',
ylabel='__auto__',
bisect=True,
**scatter_kwargs):
"""
Make a scatter plot of self data against other data.
:param fig: a preexisting figure on which to plot
:param ax: a preexisting axis on which to plot
:param figsize: figure sizes in inches, e.g. (5, 8.5).
Default figsize is config.plotsizes.
:param mask_outside: threshold to mask values outside (-mask_outside, +mask_outside)
:param subzone: LAM subzone
:param fidkey: fid key to be written on axes labels
:param xlabel: custom x label
:param ylabel: custom y label
:param bisect: plot x=y line
Other kwargs are passed to scatter() method.
"""
for f in (self, other):
if f.spectral:
f.sp2gp()
data, otherdata = self._masked_any(other, mask_outside, subzone=subzone)
otherdata = otherdata.compressed()
data = data.compressed()
fig, ax = set_figax(fig, ax, figsize=figsize)
ax.scatter(data, otherdata, **scatter_kwargs)
if xlabel is not None:
if xlabel == '__auto__':
if fidkey is None:
xlabel = str(self.fid)
else:
xlabel = str(self.fid[fidkey])
ax.set_xlabel(xlabel)
if ylabel is not None:
if ylabel == '__auto__':
if fidkey is None:
ylabel = str(other.fid)
else:
ylabel = str(other.fid[fidkey])
ax.set_ylabel(ylabel)
if bisect:
xmin = min(data.min(), otherdata.min())
xmax = max(data.max(), otherdata.max())
ax.plot((xmin, xmax), (xmin, xmax), color='red')
return fig, ax
[docs] def global_shift_center(self, longitude_shift):
"""
Shifts the center of the geometry (and the data accordingly) by
*longitude_shift* (in degrees).
:param longitude_shift: has to be a multiple of the grid's resolution
in longitude.
For global RegLLGeometry grids only.
"""
if self.geometry.name != 'regular_lonlat':
raise epygramError("only for regular lonlat geometries.")
self.geometry.global_shift_center(longitude_shift)
n = int(longitude_shift / self.geometry.grid['X_resolution'].get('degrees'))
data = self.getdata(d4=True)
data[:, :, :, :] = numpy.concatenate((data[:, :, :, n:], data[:, :, :, 0:n]),
axis=3)
self.setdata(data)
[docs] def what(self, out=sys.stdout,
validity=True,
vertical_geometry=True,
cumulativeduration=True,
arpifs_var_names=False,
fid=True):
"""
Writes in file a summary of the field.
:param out: the output open file-like object.
:param vertical_geometry: if True, writes the validity of the field.
:param vertical_geometry: if True, writes the vertical geometry of the
field.
:param cumulativeduration: if False, not written.
:param arpifs_var_names: if True, prints the equivalent 'arpifs'
variable names.
:param fid: if True, prints the fid.
"""
if self.spectral:
spectral_geometry = self.spectral_geometry.truncation
else:
spectral_geometry = None
write_formatted(out, "Kind of producting process", self.processtype)
if validity:
self.validity.what(out, cumulativeduration=cumulativeduration)
self.geometry.what(out,
vertical_geometry=vertical_geometry,
arpifs_var_names=arpifs_var_names,
spectral_geometry=spectral_geometry)
if fid:
for key in self.fid:
write_formatted(out, "fid " + key, self.fid[key])
[docs] def dump_to_nc(self, filename, variablename=None, fidkey=None):
"""
Dumps the field in a netCDF file.
:param filename: filename to dump in
:param variablename: variable name in netCDF
:param fidkey: forces *variablename* to self.fid[*fidkey*];
defaults to 'netCDF'
"""
from epygram.formats import resource
_fid = self.fid.get('netCDF')
assert None in (variablename, fidkey), \
"only one of *variablename*, *fidkey* can be provided."
if variablename is not None:
self.fid['netCDF'] = variablename
elif fidkey is not None:
self.fid['netCDF'] = self.fid[fidkey]
else:
assert 'netCDF' in self.fid, \
' '.join(["must provide *variablename* or *fidkey* for",
"determining variable name in netCDF."])
with resource(filename, 'w', fmt='netCDF') as r:
r.writefield(self)
if _fid is not None:
self.fid['netCDF'] = _fid
#############
# OPERATORS #
#############
def _check_operands(self, other):
"""
Internal method to check compatibility of terms in operations on fields.
"""
if isinstance(other, self.__class__):
assert self.spectral == other.spectral, \
"cannot operate a spectral field with a non-spectral field."
assert self.geometry.dimensions == other.geometry.dimensions, \
' '.join(["operations on fields cannot be done if fields do",
"not share their gridpoint dimensions."])
assert self.spectral_geometry == other.spectral_geometry, \
' '.join(["operations on fields cannot be done if fields do",
"not share their spectral geometry."])
assert len(self.validity) == len(other.validity), \
' '.join(["operations on fields cannot be done if fields do",
"not share their time dimension."])
else:
super(_D3CommonField, self)._check_operands(other)
def __add__(self, other):
"""
Definition of addition, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'+'} and null validity in the general case,
or conserved fid if shared by both fields, and extended validity if
both consistent; to be checked anyway.
"""
new_attributes = dict(structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
if isinstance(other, self.__class__) and (self.validity.is_valid() and other.validity.is_valid()):
if len(self.validity) == 1:
if self.validity[0] == other.validity[0]:
new_attributes.update(validity=self.validity.deepcopy())
else:
if None not in (self.validity.cumulativeduration(), other.validity.cumulativeduration()):
if self.validity.get() != other.validity.get():
if self.validity.get() < other.validity.get():
last = other
first = self
elif other.validity.get() < self.validity.get():
last = self
first = other
if last.validity.get() - last.validity.cumulativeduration() == first.validity.get():
# the beginning of other cumulation is the end of self cumulation
validity = last.validity.deepcopy()
validity.set(cumulativeduration=self.validity.cumulativeduration() + other.validity.cumulativeduration())
new_attributes.update(validity=validity)
if self.fid == other.fid:
new_attributes.update(fid=self.fid)
newfield = self._add(other, **new_attributes)
return newfield
def __mul__(self, other):
"""
Definition of multiplication, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'*'} and null validity.
"""
newfield = self._mul(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
return newfield
def __sub__(self, other):
"""
Definition of substraction, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'-'} and null validity in the general case,
or conserved fid if shared by both fields, and extended validity if
both consistent; to be checked anyway.
"""
new_attributes = dict(structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
if isinstance(other, self.__class__) and (self.validity.is_valid() and other.validity.is_valid()):
if len(self.validity) == 1:
if self.validity[0] == other.validity[0]:
new_attributes.update(validity=self.validity.deepcopy())
else:
if None not in (self.validity.cumulativeduration(), other.validity.cumulativeduration()):
if self.validity.get() != other.validity.get():
if other.validity.get() < self.validity.get():
if self.validity.get() - self.validity.cumulativeduration() == \
other.validity.get() - other.validity.cumulativeduration():
# same start of cumulation
validity = self.validity.deepcopy()
validity.set(cumulativeduration=self.validity.get() - other.validity.get())
new_attributes.update(validity=validity)
else:
if self.validity.get() != other.validity.get():
if None in (self.validity.get(), other.validity.get()):
validity = FieldValidityList(length=len(self.validity))
elif self.validity.get() < other.validity.get():
validity = other.validity.deepcopy()
validity.set(cumulativeduration=other.validity.get() - self.validity.get(),
statistical_process_on_duration=8)
elif other.validity.get() < self.validity.get():
validity = self.validity.deepcopy()
validity.set(cumulativeduration=self.validity.get() - other.validity.get(),
statistical_process_on_duration=4)
new_attributes.update(validity=validity)
if self.fid == other.fid:
new_attributes.update(fid=self.fid)
newfield = self._sub(other, **new_attributes)
return newfield
def __div__(self, other):
"""
Definition of division, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'/'} and null validity.
"""
newfield = self._div(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
return newfield
__radd__ = __add__
__rmul__ = __mul__
def __rsub__(self, other):
"""
Definition of substraction, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'-'} and null validity.
"""
newfield = self._rsub(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
return newfield
def __rdiv__(self, other):
"""
Definition of division, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'/'} and null validity.
"""
newfield = self._rdiv(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry,
validity=FieldValidityList(length=len(self.validity)))
return newfield
[docs]class D3Field(_D3CommonField):
"""
3-Dimensions field class.
A field is defined by its identifier 'fid',
its data, its geometry (gridpoint and optionally spectral),
and its validity.
The natural being of a field is gridpoint, so that:
a field always has a gridpoint geometry, but it has a spectral geometry only
in case it is spectral.
"""
_collector = ('field',)
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D'])),
geometry=dict(
type=Geometry,
info="Geometry defining the position of the field gridpoints."),
validity=dict(
type=FieldValidityList,
info="Validity of the field.",
optional=True,
access='rwx',
default=FieldValidityList()),
spectral_geometry=dict(
info="For a spectral field, its spectral geometry handles \
spectral transforms and dimensions.",
type=SpectralGeometry,
optional=True),
processtype=dict(
optional=True,
access='rwx',
info="Generating process.")
)
)
##############
# ABOUT DATA #
##############
[docs] def sp2gp(self):
"""
Transforms the spectral field into gridpoint, according to its spectral
geometry. Replaces data in place.
The spectral transform subroutine is actually included in the spectral
geometry's *sp2gp()* method.
"""
if self.spectral:
gpdims = self._get_gpdims_for_spectral_transforms()
if self.geometry.rectangular_grid:
# LAM
gpdata = numpy.empty(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
else:
# global
gpdata = numpy.ma.zeros(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
for t in range(len(self.validity)):
for k in range(len(self.geometry.vcoordinate.levels)):
spdata_i = self.getdata(d4=True)[t, k, :]
gpdata_i = self.spectral_geometry.sp2gp(spdata_i, gpdims)
gpdata_i = self.geometry.reshape_data(gpdata_i)
gpdata[t, k, :, :] = gpdata_i[:, :]
self._attributes['spectral_geometry'] = None
self.setdata(gpdata)
[docs] def gp2sp(self, spectral_geometry):
"""
Transforms the gridpoint field into spectral space, according to the
*spectral_geometry* mandatorily passed as argument. Replaces data in
place.
:param spectral_geometry: instance of SpectralGeometry, actually
containing spectral transform subroutine (in
in its own *gp2sp()* method).
"""
assert isinstance(spectral_geometry, SpectralGeometry)
if not self.spectral:
gpdims = self._get_gpdims_for_spectral_transforms()
spdata = None
for t in range(len(self.validity)):
for k in range(len(self.geometry.vcoordinate.levels)):
gpdata_i = stretch_array(self.getdata(d4=True)[t, k, :, :])
spdata_i = spectral_geometry.gp2sp(gpdata_i, gpdims)
n = len(spdata_i)
if spdata is None:
spdata = numpy.empty((len(self.validity),
len(self.geometry.vcoordinate.levels),
n))
spdata[t, k, :] = spdata_i[:]
self._attributes['spectral_geometry'] = spectral_geometry
self.setdata(spdata)
def _get_gpdims_for_spectral_transforms(self):
"""
Build a dictionary containing gridpoint dimensions for the call to
spectral transforms.
"""
if self.geometry.rectangular_grid:
# LAM
gpdims = {}
for dim in ['X', 'Y', 'X_CIzone', 'Y_CIzone']:
gpdims[dim] = self.geometry.dimensions[dim]
for item in ['X_resolution', 'Y_resolution']:
gpdims[item] = self.geometry.grid[item]
else:
# global
gpdims = {}
for dim in ['lat_number', 'lon_number_by_lat']:
gpdims[dim] = self.geometry.dimensions[dim]
return gpdims
[docs] def compute_xy_spderivatives(self):
"""
Compute the derivatives of field in spectral space, then come back in
gridpoint space.
Returns the two derivative fields.
The spectral transform and derivatives subroutines are actually included
in the spectral geometry's *compute_xy_spderivatives()* method.
"""
if self.spectral:
gpdims = self._get_gpdims_for_spectral_transforms()
if self.geometry.rectangular_grid:
# LAM
gpderivX = numpy.empty(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
gpderivY = numpy.empty(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
else:
# global
gpderivX = numpy.ma.zeros(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
gpderivY = numpy.ma.zeros(self.geometry.get_datashape(dimT=len(self.validity), d4=True))
for t in range(len(self.validity)):
for k in range(len(self.geometry.vcoordinate.levels)):
spdata_i = self.getdata(d4=True)[t, k, :]
(dx, dy) = self.spectral_geometry.compute_xy_spderivatives(spdata_i, gpdims)
dx = self.geometry.reshape_data(dx)
dy = self.geometry.reshape_data(dy)
gpderivX[t, k, :, :] = dx[:, :]
gpderivY[t, k, :, :] = dy[:, :]
field_dX = copy.deepcopy(self)
field_dY = copy.deepcopy(self)
for field in (field_dX, field_dY):
field._attributes['spectral_geometry'] = None
field_dX.fid = {'derivative':'x'}
field_dY.fid = {'derivative':'y'}
field_dX.setdata(gpderivX)
field_dY.setdata(gpderivY)
else:
raise epygramError('field must be spectral to compute its spectral derivatives.')
return (field_dX, field_dY)
[docs] def getdata(self, subzone=None, d4=False):
"""
Returns the field data, with 3D shape if the field is not spectral,
2D if spectral.
:param subzone: optional, among ('C', 'CI'), for LAM fields only,
returns the data resp. on the C or C+I zone.
Default is no subzone, i.e. the whole field.
:param d4: - if True, returned values are shaped in a 4 dimensions array
- if False, shape of returned values is determined with
respect to geometry.
Shape of 4D data: \n
- Rectangular grids:\n
grid[t,k,0,0] is SW, grid[t,k,-1,-1] is NE \n
grid[t,k,0,-1] is SE, grid[t,k,-1,0] is NW \n
with k the level, t the temporal dimension
- Gauss grids:\n
grid[t,k,0,:Nj] is first (Northern) band of latitude, masked after
Nj = number of longitudes for latitude j \n
grid[t,k,-1,:Nj] is last (Southern) band of latitude (idem). \n
with k the level, t the temporal dimension
"""
data = self._data
if not self.spectral and subzone is not None:
if self.geometry.grid.get('LAMzone') is not None:
data = self.geometry.extract_subzone(data, subzone)
else:
raise epygramError("*subzone* cannot be provided for this field.")
if not d4:
data = data.squeeze()
return data
[docs] def setdata(self, data):
"""
Sets field data, checking *data* to have the good shape according to
geometry.
:param data: dimensions should in any case be ordered in a subset of
(t,z,y,x), or (t,z,n) if spectral (2D spectral coefficients must be 1D
with ad hoc ordering, n being the total number of spectral
coefficients).
*data* may be 4D (3D if spectral) even if the field is not, as long as
the above dimensions ordering is respected.
"""
if not isinstance(data, numpy.ndarray):
data = numpy.array(data)
# determination of field shape
if self.spectral:
shp = (len(self.validity),
len(self.geometry.vcoordinate.levels),
data.shape[-1])
elif 'gauss' in self.geometry.name:
shp = (len(self.validity),
len(self.geometry.vcoordinate.levels),
self.geometry.dimensions['lat_number'],
self.geometry.dimensions['max_lon_number'])
else:
shp = (len(self.validity),
len(self.geometry.vcoordinate.levels),
self.geometry.dimensions['Y'],
self.geometry.dimensions['X'])
if (len(data.shape) == 3 and self.spectral) or len(data.shape) == 4:
# data is given as 4D: check compatibility
assert data.shape == shp, \
' '.join(['data', str(data.shape),
'should have shape', str(shp)])
else:
# data is squeezed: reshape
try:
data = data.reshape(shp)
except ValueError:
print("'data' shape has to be compatible with field shape: {}".format(shp))
raise
"""# find indexes corresponding to dimensions
dimensions = 0
indexes = {'t':0, 'z':1, 'y':2, 'x':3}
# t, z
if len(self.validity) > 1:
dimensions += 1
else:
indexes['t'] = None
for i in ('z', 'y', 'x'):
indexes[i] = indexes[i] - 1
if self.geometry.datashape['k']:
dimensions += 1
else:
indexes['z'] = None
for i in ('y', 'x'):
indexes[i] = indexes[i] - 1
# y, x or spectral ordering
if self.spectral:
dimensions += 1
dataType = "spectral"
else:
if self.geometry.datashape['j']:
dimensions += 1
else:
indexes['y'] = None
for i in ('x',):
indexes[i] = indexes[i] - 1
if self.geometry.datashape['i']:
dimensions += 1
else:
indexes['x'] = None
dataType = "gridpoint"
# check dimensions
assert (len(numpy.shape(data)) == dimensions
or numpy.shape(data) == (1,)), \
dataType + " data should be " + str(dimensions) + "D array."
if indexes['t'] is not None:
assert data.shape[0] == len(self.validity), \
' == '.join(['data.shape[0] should be len(self.validity)',
str(len(self.validity))])
if self.geometry.datashape['k']:
assert data.shape[indexes['z']] == len(self.geometry.vcoordinate.levels), \
' == '.join(['data.shape[' + str(indexes['z']) +
'] should be len(self.geometry.vcoordinate.levels)',
str(len(self.geometry.vcoordinate.levels))])
if not self.spectral:
if 'gauss' in self.geometry.name:
if self.geometry.datashape['j']:
assert data.shape[indexes['y']] == self.geometry.dimensions['lat_number'], \
' == '.join(['data.shape[' + str(indexes['y']) +
"] should be self.geometry.dimensions['lat_number']",
str(self.geometry.dimensions['lat_number'])])
if self.geometry.datashape['i']:
assert data.shape[indexes['x']] == self.geometry.dimensions['max_lon_number'], \
' == '.join(['data.shape[' + str(indexes['x']) +
"] should be self.geometry.dimensions['max_lon_number']",
str(self.geometry.dimensions['max_lon_number'])])
else:
if self.geometry.datashape['j']:
assert data.shape[indexes['y']] == self.geometry.dimensions['Y'], \
' == '.join(['data.shape[' + str(indexes['y']) +
"] should be self.geometry.dimensions['Y']",
str(self.geometry.dimensions['Y'])])
if self.geometry.datashape['i']:
assert data.shape[indexes['x']] == self.geometry.dimensions['X'], \
' == '.join(['data.shape[' + str(indexes['x']) +
"] should be self.geometry.dimensions['X']",
str(self.geometry.dimensions['X'])])
# reshape to 4D
data = data.reshape(shp)"""
super(D3Field, self).setdata(data)
data = property(getdata, setdata, Field.deldata, "Accessor to the field data.")
[docs] def select_subzone(self, subzone):
"""
If a LAMzone defines the field, select only the *subzone* from it.
:param subzone: among ('C', 'CI').
Warning: modifies the field and its geometry in place !
"""
if self.geometry.grid.get('LAMzone') is not None:
data = self.getdata(subzone=subzone)
self._attributes['geometry'] = self.geometry.select_subzone(subzone)
self.setdata(data)
[docs] def getvalue_ij(self, i=None, j=None, k=None, t=None,
one=True):
"""
Returns the value of the field on point of indices (*i, j, k, t*).
Take care (*i, j, k, t*) is python-indexing, ranging from 0 to
dimension-1.
:param i: the index in X direction
:param j: the index in Y direction
:param k: the index of the level (not a value in Pa or m...)
:param t: the index of the temporal dimension (not a validity object)
*k* and *t* can be scalar even if *i* and *j* are arrays.
If *one* is False, returns [value] instead of value.
"""
if t is None:
if len(self.validity) > 1:
raise epygramError("*t* is mandatory when there are several validities")
t = 0
if k is None:
if self.geometry.datashape['k']:
raise epygramError("*k* is mandatory when field has a vertical coordinate")
k = 0
if j is None:
if self.geometry.datashape['j']:
raise epygramError("*j* is mandatory when field has a two horizontal dimensions")
j = 0
if i is None:
if self.geometry.datashape['i']:
raise epygramError("*i* is mandatory when field has one horizontal dimension")
i = 0
i, j = as_numpy_array(i).flatten(), as_numpy_array(j).flatten()
k, t = as_numpy_array(k).flatten(), as_numpy_array(t).flatten()
if not numpy.all(self.geometry.point_is_inside_domain_ij(i, j)):
raise ValueError("point is out of field domain.")
sizes = set([len(x) for x in [i, j, k, t]])
if len(sizes) > 2 or (len(sizes) == 2 and 1 not in sizes):
raise epygramError("each of i, j, k and t must be scalar or have the same length as the others")
value = (self.getdata(d4=True)[t, k, j, i]).copy()
if value.size == 1 and one:
value = value.item()
return value
[docs] def getlevel(self, level=None, k=None):
"""
Returns a level of the field as a new field.
:param level: the requested level expressed in coordinate value (Pa, m...)
:param k: the index of the requested level
"""
if k is None and level is None:
raise epygramError("You must give k or level.")
if k is not None and level is not None:
raise epygramError("You cannot give, at the same time, k and level")
if level is not None:
if level not in self.geometry.vcoordinate.levels:
raise epygramError("The requested level does not exist.")
my_level = level
my_k = self.geometry.vcoordinate.levels.index(level)
else:
my_k = k
my_level = self.geometry.vcoordinate.levels[k]
if self.structure not in ['3D', 'V2D', 'V1D']:
raise epygramError("It's not possible to extract a level from a " + self.structure + " field.")
kwargs_vcoord = {'typeoffirstfixedsurface': self.geometry.vcoordinate.typeoffirstfixedsurface,
'position_on_grid': self.geometry.vcoordinate.position_on_grid,
'levels':[my_level]}
if self.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
newvcoordinate = VGeometry(**kwargs_vcoord)
newgeometry = self.geometry.deepcopy()
newgeometry.vcoordinate = newvcoordinate
generic_fid = self.fid.get('generic', {})
generic_fid['level'] = my_level if not isinstance(my_level, numpy.ndarray) else my_k # to avoid arrays in fid
kwargs_field = {'structure':newgeometry.structure,
'validity':self.validity.copy(),
'processtype':self.processtype,
'geometry':newgeometry,
'fid':{'generic':generic_fid}}
if self.spectral_geometry is not None:
kwargs_field['spectral_geometry'] = self.spectral_geometry.copy()
newfield = fpx.field(**kwargs_field)
newfield.setdata(self.getdata(d4=True)[:, my_k:my_k + 1, ...])
return newfield
[docs] def getvalidity(self, index_or_validity):
"""
Returns the field restrained to one of its temporal validity as a new
field.
:param index_or_validity: can be either a
:class:`epygram.base.FieldValidity` instance
or the index of the requested validity in the
field's FieldValidityList.
"""
assert isinstance(index_or_validity, FieldValidity) or isinstance(index_or_validity, int), \
"index_or_validity* should be either a FieldValidity instance or an int."
if isinstance(index_or_validity, FieldValidity):
validity = index_or_validity
try:
index = self.validity.index(index_or_validity)
except ValueError:
raise ValueError('*validity* not in field validity.')
else:
index = index_or_validity
validity = self.validity[index]
newfield = self.deepcopy()
newfield.validity = validity
newfield.setdata(self.getdata(d4=True)[index:index + 1, :, :, ])
if len(numpy.array(newfield.geometry.vcoordinate.levels).shape) > 1:
# levels are, at least, dependent on position
levels4d = newfield.geometry.get_levels(d4=True, nb_validities=len(self.validity))
newfield.geometry.vcoordinate.levels = list(levels4d[index, ...].squeeze())
return newfield
[docs] def center(self):
"""
Performs an averaging on data values to center the field on the grid (aka shuman)
and modify geometry.
"""
posSpl = self.geometry.position_on_horizontal_grid.split('-')
if len(posSpl) == 2: # not center
posY, posX = posSpl
if (posY == 'lower' or posX == 'left'):
if not isinstance(self.geometry, ProjectedGeometry):
raise NotImplementedError("Centering is not available with non-projected geometries")
data = self.getdata(d4=True)
if data is not None: # This test is useful if field has been retrieved with getdata=False
if posY == 'lower':
data[:, :, :-1, :] = 0.5 * (data[:, :, :-1, :] + data[:, :, 1:, :])
data[:, :, -1, :] = data[:, :, -2, :]
if posX == 'left':
data[:, :, :, :-1] = 0.5 * (data[:, :, :, :-1] + data[:, :, :, 1:])
data[:, :, :, -1] = data[:, :, :, -2]
self.setdata(data)
self.geometry.position_on_horizontal_grid = 'center'
[docs]class D3VirtualField(_D3CommonField):
"""
3-Dimensions Virtual field class.
Data is taken from other fields, either:
- a given *fieldset*
- a *resource* in which are stored fields defined by *resource_fids*.
"""
_collector = ('field',)
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D'])),
fieldset=dict(
info="Set of real fields that can compose the Virtual Field.",
type=FieldSet,
optional=True,
default=FieldSet()),
resource=dict(
info="Resource in which is stored the fields defined by \
resource_fids.",
type=Resource,
optional=True),
resource_fids=dict(
info="Definition of the fields in resource that compose the \
virtual field.",
type=FPList,
optional=True,
default=FPList()),
)
)
def __init__(self, *args, **kwargs):
super(D3VirtualField, self).__init__(*args, **kwargs)
if len(self.fieldset) != 0:
if self.resource is not None or self.resource_fids != FPList():
raise epygramError("You cannot set fieldset and (resource or resource_fids) at the same time.")
def fieldGenerator(getdata):
for field in self.fieldset:
yield field.fid, field
self._fieldGenerator = fieldGenerator
def getFieldByFid(fid, getdata):
result = [field for field in self.fieldset if field.fid == fid]
if len(result) != 1:
raise epygramError("There is no, or more than one, field(s) matching with this fid.")
return result[0]
self._getFieldByFid = getFieldByFid
self._mode = 'fieldset'
else:
if self.resource is None or self.resource_fids == FPList():
raise epygramError("If you do not set fieldset, you need to provide resource and resource_fids.")
if all([k in self.resource.listfields() for k in self.resource_fids]):
fidlist = self.resource_fids
else:
#There could be pattern to search for instead of true fid
fidlist = self.resource.find_fields_in_resource(seed=self.resource_fids,
fieldtype=['Point', 'V1D', 'V2D', 'H2D', '3D'])
if len(fidlist) == 0:
raise epygramError("There is no field in resource matching with resource_fids")
def fieldGenerator(getdata):
for fid in fidlist:
field = self.resource.readfield(fid, getdata=getdata)
yield fid, field
self._fieldGenerator = fieldGenerator
def getFieldByFid(fid, getdata):
return self.resource.readfield(fid, getdata=getdata)
self._getFieldByFid = getFieldByFid
self._mode = 'resource'
first = True
self._fidList = []
levelList = []
levelIsArray = False
for fid, field in self._fieldGenerator(getdata=False):
if field.structure not in ['Point', 'V1D', 'V2D', 'H1D', 'H2D', '3D']:
raise epygramError("3D virtual fields must be build from 'physical' fields only")
if first:
self._structure = field.structure
self._geometry = field.geometry.deepcopy() # We need a deepcopy as we modify it
self._validity = field.validity.copy()
if field.spectral_geometry is not None:
self._spectral_geometry = field.spectral_geometry.copy()
else:
self._spectral_geometry = None
self._processtype = field.processtype
else:
if self._structure != field.structure:
raise epygramError("All fields must share the structure")
if self._geometry.structure != field.geometry.structure or \
self._geometry.name != field.geometry.name or \
self._geometry.grid != field.geometry.grid or \
self._geometry.dimensions != field.geometry.dimensions or \
self._geometry.position_on_horizontal_grid != field.geometry.position_on_horizontal_grid:
raise epygramError("All fields must share the horizontal geometry")
if self._geometry.projected_geometry or field.geometry.projected_geometry or \
self._geometry.name == 'academic' or field.geometry.name == 'academic':
if self._geometry.projection != field.geometry.projection:
raise epygramError("All fields must share the geometry projection")
if self._geometry.projected_geometry or field.geometry.projected_geometry:
if self._geometry.projection != field.geometry.projection or \
self._geometry.geoid != field.geometry.geoid:
raise epygramError("All fields must share the geometry geoid")
if self._geometry.vcoordinate.typeoffirstfixedsurface != field.geometry.vcoordinate.typeoffirstfixedsurface or \
self._geometry.vcoordinate.position_on_grid != field.geometry.vcoordinate.position_on_grid:
raise epygramError("All fields must share the vertical geometry")
if self._geometry.vcoordinate.grid is not None or field.geometry.vcoordinate.grid is not None:
if self._geometry.vcoordinate.grid != field.geometry.vcoordinate.grid:
raise epygramError("All fields must share the vertical grid")
if self._validity != field.validity:
raise epygramError("All fields must share the validity")
if self._spectral_geometry != field.spectral_geometry:
raise epygramError("All fields must share the spectral geometry")
if self._processtype != field.processtype:
raise epygramError("All fields must share the processtype")
if len(field.geometry.vcoordinate.levels) != 1:
raise epygramError("fields must have only one level")
levelIsArray = levelIsArray or isinstance(field.geometry.vcoordinate.levels[0], numpy.ndarray)
if (not isinstance(field.geometry.vcoordinate.levels[0], numpy.ndarray)) and field.geometry.vcoordinate.levels[0] in levelList:
raise epygramError("This level have already been found")
levelList.append(field.geometry.vcoordinate.levels[0])
if fid in self._fidList:
raise epygramError("fields must have different fids")
self._fidList.append(fid)
kwargs_vcoord = dict(typeoffirstfixedsurface=self._geometry.vcoordinate.typeoffirstfixedsurface,
position_on_grid=self._geometry.vcoordinate.position_on_grid)
if self._geometry.vcoordinate.grid is not None:
kwargs_vcoord['grid'] = self._geometry.vcoordinate.grid
if levelIsArray:
kwargs_vcoord['levels'] = levelList
else:
kwargs_vcoord['levels'], self._fidList = (list(t) for t in zip(*sorted(zip(levelList, self._fidList)))) # TOBECHECKED
newvcoordinate = VGeometry(**kwargs_vcoord)
newstructure = {'3D': '3D',
'H2D': '3D',
'V1D': 'V1D',
'Point': 'V1D',
'V2D': 'V2D',
'H1D': 'V2D'}[field.structure]
assert newstructure == self.structure, \
"Individual fields structure do not match the field strucuture"
self._geometry = self.geometry.deepcopy()
self._geometry.vcoordinate = newvcoordinate
self._spgpOpList = []
def as_real_field(self, getdata=True):
field3d = fpx.field(fid=dict(self.fid),
structure={'H2D':'3D',
'Point':'V1D',
'H1D':'V2D',
'V1D':'V1D',
'V2D':'V2D',
'3D':'3D'}[self._structure],
geometry=self.geometry,
validity=self.validity,
spectral_geometry=self.spectral_geometry,
processtype=self.processtype)
if getdata:
field3d.setdata(self.getdata(d4=True))
return field3d
@property
def geometry(self):
return self._geometry
@property
def validity(self):
return self._validity
@property
def spectral_geometry(self):
return self._spectral_geometry
@property
def processtype(self):
return self._processtype
##############
# ABOUT DATA #
##############
[docs] def sp2gp(self):
"""
Transforms the spectral field into gridpoint, according to its spectral
geometry. Replaces data in place.
The spectral transform subroutine is actually included in the spectral
geometry's *sp2gp()* method.
"""
self._spectral_geometry = None
if self._mode == 'resource':
self._spgpOpList.append(('sp2gp', {}))
else:
for field in self.fieldset:
field.sp2gp()
[docs] def gp2sp(self, spectral_geometry):
"""
Transforms the gridpoint field into spectral space, according to the
*spectral_geometry* mandatorily passed as argument. Replaces data in
place.
:param spectral_geometry: instance of SpectralGeometry, actually
containing spectral transform subroutine (in
in its own *gp2sp()* method).
"""
self._spectral_geometry = spectral_geometry
if self._mode == 'resource':
self._spgpOpList.append(('gp2sp', {'spectral_geometry':spectral_geometry}))
else:
for field in self.fieldset:
field.gp2sp(spectral_geometry)
[docs] def getdata(self, subzone=None, d4=False):
"""
Returns the field data, with 3D shape if the field is not spectral,
2D if spectral.
:param subzone: optional, among ('C', 'CI'), for LAM fields only,
returns the data resp. on the C or C+I zone.
Default is no subzone, i.e. the whole field.
:param d4: - if True, returned values are shaped in a 4 dimensions array
- if False, shape of returned values is determined with
respect to geometry
Shape of 3D data: \n
- Rectangular grids:\n
grid[k,0,0] is SW, grid[k,-1,-1] is NE \n
grid[k,0,-1] is SE, grid[k,-1,0] is NW \n
with k the level
- Gauss grids:\n
grid[k,0,:Nj] is first (Northern) band of latitude, masked after
Nj = number of longitudes for latitude j \n
grid[k,-1,:Nj] is last (Southern) band of latitude (idem). \n
with k the level
"""
dataList = []
concat = numpy.concatenate
arr = numpy.array
missing = set()
for k in range(len(self.geometry.vcoordinate.levels)):
data = self.getlevel(k=k).getdata(subzone=subzone, d4=d4)
if isinstance(data, numpy.ma.masked_array):
concat = numpy.ma.concatenate
arr = numpy.ma.array
missing.add(data.fill_value)
dataList.append(data)
if d4:
# vertical dimension already exists, and is the second one
result = concat(dataList, axis=1)
else:
if len(self.validity) > 1:
# vertical dimension does not exist and
# must be the second one of the resulting array
result = numpy.stack(dataList, axis=1)
else:
# vertical dimension does not exist and
# must be the first one of the resulting array
result = arr(dataList)
if len(missing) == 1:
result.set_fill_value(missing.pop())
return result
[docs] def setdata(self, data):
"""setdata() not implemented on virtual fields."""
raise epygramError("setdata cannot be implemented on virtual fields")
[docs] def deldata(self):
"""deldata() not implemented on virtual fields."""
raise epygramError("deldata cannot be implemented on virtual fields")
data = property(getdata)
[docs] def getvalue_ij(self, i=None, j=None, k=None, t=None,
one=True):
"""
Returns the value of the field on point of indices (*i, j, k, t*).
Take care (*i, j, k, t*) is python-indexing, ranging from 0 to
dimension-1.
:param i: the index in X direction
:param j: the index in Y direction
:param k: the index of the level (not a value in Pa or m...)
:param t: the index of the temporal dimension (not a validity object)
*k* and *t* can be scalar even if *i* and *j* are arrays.
If *one* is False, returns [value] instead of value.
"""
if t is None:
if len(self.validity) > 1:
raise epygramError("*t* is mandatory when there are several validities")
t = 0
if k is None:
if self.geometry.datashape['k']:
raise epygramError("*k* is mandatory when field has a vertical coordinate")
k = 0
if j is None:
if self.geometry.datashape['j']:
raise epygramError("*j* is mandatory when field has a two horizontal dimensions")
j = 0
if i is None:
if self.geometry.datashape['i']:
raise epygramError("*i* is mandatory when field has one horizontal dimension")
i = 0
i, j = as_numpy_array(i).flatten(), as_numpy_array(j).flatten()
k, t = as_numpy_array(k).flatten(), as_numpy_array(t).flatten()
if not numpy.all(self.geometry.point_is_inside_domain_ij(i, j)):
raise ValueError("point is out of field domain.")
sizes = set([len(x) for x in [i, j, k, t]])
if len(sizes) > 2 or (len(sizes) == 2 and 1 not in sizes):
raise epygramError("each of i, j, k and t must be scalar or have the same length as the others")
if max(sizes) > 1 and len(k) == 1:
k = k.repeat(max(sizes))
oldk = k[0]
data = self.getlevel(k=oldk).getdata(d4=True)
value = numpy.ndarray((max(sizes),), dtype=data.dtype)
for thisk in numpy.unique(k):
if thisk != oldk:
data = self.getlevel(k=thisk).getdata(d4=True)
oldk = thisk
mask = k == thisk
value[mask] = data[t[mask] if len(t) > 1 else t,
0,
j[mask] if len(j) > 1 else j,
i[mask] if len(i) > 1 else i]
if value.size == 1 and one:
value = value.item()
return value
[docs] def getlevel(self, level=None, k=None):
"""
Returns a level of the field as a new field.
:param level: the requested level expressed in coordinate value (Pa, m...)
:param k: the index of the requested level
"""
if k is None and level is None:
raise epygramError("You must give k or level.")
if k is not None and level is not None:
raise epygramError("You cannot give, at the same time, k and level")
if level is not None:
if level not in self.geometry.vcoordinate.levels:
raise epygramError("The requested level does not exist.")
my_k = self.geometry.vcoordinate.levels.index(level)
else:
my_k = k
result = self._getFieldByFid(self._fidList[my_k], True)
#update vccordinate in case of this attribute was changed in the geometry
vcoord = self.geometry.vcoordinate.deepcopy()
levels = vcoord.levels[my_k]
for _ in range(len(vcoord.levels)):
vcoord.levels.pop()
vcoord.levels.append(levels)
result.geometry.vcoordinate = vcoord
for op, kwargs in self._spgpOpList:
if op == 'sp2gp':
result.sp2gp(**kwargs)
elif op == 'gp2sp':
result.gp2sp(**kwargs)
else:
raise epygramError("operation not known")
return result
footprints.collectors.get(tag='fields').fasttrack = ('structure',)