#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains classes for netCDF4 resource.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import copy
import json
import sys
import numpy
from collections import OrderedDict
import datetime
import six
import re
import netCDF4
import footprints
from footprints import proxy as fpx, FPDict, FPList
from bronx.syntax.arrays import stretch_array
from epygram import config, epygramError, util
from epygram.base import FieldValidity, FieldValidityList
from epygram.resources import FileResource
from epygram.util import Angle
from epygram.geometries import (VGeometry, ProjectedGeometry, GaussGeometry,
RegLLGeometry, AcademicGeometry, UnstructuredGeometry)
__all__ = ['netCDF']
epylog = footprints.loggers.getLogger(__name__)
_typeoffirstfixedsurface_dict = {'altitude':102,
'height':103,
'atmosphere_hybrid_sigma_pressure_coordinate':119,
'atmosphere_hybrid_height_coordinate':118,
'pressure':100}
_typeoffirstfixedsurface_short_dict = _typeoffirstfixedsurface_dict.copy()
_typeoffirstfixedsurface_short_dict.update({'hybrid-pressure':119,
'hybrid-height':118})
_typeoffirstfixedsurface_dict_inv = {v:k for k, v in _typeoffirstfixedsurface_dict.items()}
_typeoffirstfixedsurface_short_dict_inv = {v:k for k, v in _typeoffirstfixedsurface_short_dict.items()}
_proj_dict = {'lambert':'lambert_conformal_conic',
'mercator':'mercator',
'polar_stereographic':'polar_stereographic',
'space_view':'vertical_perspective'}
_proj_dict_inv = {v:k for k, v in _proj_dict.items()}
def _default_numpy2json(obj):
"""Helper to encode numpy arrays to json."""
if type(obj).__module__ == numpy.__name__:
if isinstance(obj, numpy.ndarray):
return obj.tolist()
elif any([isinstance(obj, t) for t in [numpy.int8, numpy.int16, numpy.int32, numpy.int64]]):
return int(obj)
elif any([isinstance(obj, t) for t in [numpy.float32, numpy.float64]]):
return float(obj)
else:
return obj.item()
raise TypeError('Unknown type:', type(obj))
[docs]class netCDF(FileResource):
"""Class implementing all specificities for netCDF (4) resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['netCDF']),
default='netCDF'),
behaviour=dict(
info="Describes how fields are defined in resource.",
type=FPDict,
optional=True,
default=config.netCDF_default_behaviour)
)
)
def __init__(self, *args, **kwargs):
self.isopen = False
super(netCDF, self).__init__(*args, **kwargs)
if self.openmode in ('r', 'a'):
try:
guess = netCDF4.Dataset(self.container.abspath, self.openmode)
except (RuntimeError, UnicodeEncodeError):
raise IOError("this resource is not a netCDF one.")
else:
guess.close()
behaviour = copy.copy(config.netCDF_default_behaviour)
behaviour.update(self.behaviour)
self._attributes['behaviour'] = behaviour
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens a netCDF and initializes some attributes.
:param openmode: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(netCDF, self).open(openmode=openmode)
self._nc = netCDF4.Dataset(self.container.abspath, self.openmode)
self.isopen = True
if self.openmode == 'w':
self.set_default_global_attributes()
[docs] def close(self):
"""Closes a netCDF."""
if hasattr(self, '_nc') and self._nc._isopen:
self._nc.close()
self.isopen = False
[docs] def variables_number(self):
"""Return the number of variables in resource."""
return len(self._variables)
[docs] def find_fields_in_resource(self, seed=None, generic=False, **_):
"""
Returns a list of the fields from resource whose name match the given
seed.
:param seed: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
:param fieldtype: optional, among ('H2D', 'Misc') or a list of these strings.
If provided, filters out the fields not of the given types.
:param generic: if True, returns complete fid's,
union of {'FORMATname':fieldname} and the according generic fid of
the fields.
"""
if seed is None:
fieldslist = self.listfields()
elif isinstance(seed, six.string_types):
fieldslist = util.find_re_in_list(seed, self.listfields())
elif isinstance(seed, list):
fieldslist = []
for s in seed:
fieldslist += util.find_re_in_list(s, self.listfields())
if fieldslist == []:
raise epygramError("no field matching '" + seed +
"' was found in resource " +
self.container.abspath)
if generic:
raise NotImplementedError("not yet.")
return fieldslist
@FileResource._openbeforedelayed
def ncinfo_field(self, fid):
"""
Get info about the field (dimensions and meta-data of the netCDF variable).
:param fid: netCDF field identifier
"""
assert fid in self.listfields(), 'field: ' + fid + ' not in resource.'
dimensions = OrderedDict()
for d in self._variables[fid].dimensions:
dimensions[d] = len(self._dimensions[d])
metadata = {a:getattr(self._variables[fid], a) for a in self._variables[fid].ncattrs()}
return {'dimensions':dimensions,
'metadata':metadata}
@FileResource._openbeforedelayed
def readfield(self, fid,
getdata=True,
only=None,
adhoc_behaviour=None):
"""
Reads one field, given its netCDF name, and returns a Field instance.
:param fid: netCDF field identifier
:param getdata: if *False*, only metadata are read, the field do not
contain data.
:param only: to specify indexes [0 ... n-1] of specific dimensions,
e.g. {'time':5,} to select only the 6th term of time dimension.
:param adhoc_behaviour: to specify "on the fly" a behaviour (usual
dimensions or grids, ...).
"""
# 0. initialization
assert self.openmode != 'w', \
"cannot read fields in resource if with openmode == 'w'."
assert fid in self.listfields(), \
' '.join(["field", fid, "not found in resource."])
only = util.ifNone_emptydict(only)
adhoc_behaviour = util.ifNone_emptydict(adhoc_behaviour)
field_kwargs = {'fid':{'netCDF':fid}}
variable = self._variables[fid]
behaviour = self.behaviour.copy()
behaviour.update(adhoc_behaviour)
# 1.1 identify usual dimensions
all_dimensions_e2n = {}
for d in self._dimensions.keys():
for sd in config.netCDF_standard_dimensions:
if d in config.netCDF_usualnames_for_standard_dimensions[sd]:
all_dimensions_e2n[sd] = d
variable_dimensions = {d:len(self._dimensions[d]) for d in variable.dimensions}
for d in variable_dimensions.keys():
for sd in config.netCDF_standard_dimensions:
# if behaviour is not explicitly given,
# try to find out who is "d" among the standard dimensions
if sd not in behaviour and d in config.netCDF_usualnames_for_standard_dimensions[sd]:
behaviour[sd] = d
dims_dict_n2e = {}
for d in variable_dimensions.keys():
for k in config.netCDF_standard_dimensions:
if d == behaviour.get(k):
dims_dict_n2e[d] = k
dims_dict_e2n = {v:k for k, v in dims_dict_n2e.items()}
# 1.2 try to identify grids
for f in self.listfields():
for sd in config.netCDF_standard_dimensions:
sg = sd.replace('dimension', 'grid')
# if behaviour is not explicitly given,
# try to find out who is "f" among the standard grids
if sg not in behaviour and f in config.netCDF_usualnames_for_standard_dimensions[sd] or \
f == behaviour.get(sd):
behaviour[sg] = f
# 2. time
def get_validity(T_varname):
try:
validity = FieldValidityList()
validity.pop()
if T_varname not in self._listfields():
raise epygramError('unable to find T_grid in variables.')
T = self._variables[T_varname][:]
if len(self._variables[T_varname].dimensions) == 0:
T = [T]
time_unit = getattr(self._variables[T_varname], 'units', '')
if re.match(r'(hours|seconds|days|minutes)\s+since.+$', time_unit):
T = netCDF4.num2date(T, time_unit).squeeze().reshape((len(T),))
T = [datetime.datetime(*t.timetuple()[:6]) for t in T] # FIXME: not sure of that for dates older than julian/gregorian calendar
basis = netCDF4.num2date(0, time_unit)
if basis.year <= 1582:
epylog.warning('suspicion of inconsistency of julian/gregorian dates')
basis = datetime.datetime(*basis.timetuple()[:6]) # FIXME: not sure of that for dates older than julian/gregorian calendar
for v in T:
validity.append(FieldValidity(date_time=v, basis=basis))
else:
epylog.warning('temporal unit is not CF1.6-compliant, cannot decode.')
for v in T:
validity.append(FieldValidity())
except Exception:
epylog.warning("Failed to read validity, but ignore !")
validity = FieldValidityList()
return validity
if 'T_dimension' in dims_dict_e2n:
# field has a time dimension
var_corresponding_to_T_grid = behaviour.get('T_grid', False)
validity = get_validity(var_corresponding_to_T_grid)
if dims_dict_e2n['T_dimension'] in only:
validity = validity[only[dims_dict_e2n['T_dimension']]]
elif any([t in self._listfields()
for t in config.netCDF_usualnames_for_standard_dimensions['T_dimension']]):
# look for a time variable
T_varnames = [t for t in config.netCDF_usualnames_for_standard_dimensions['T_dimension'] if t in self._listfields()]
_v = get_validity(T_varnames[0])
if len(_v) == 1:
validity = _v
else:
validity = FieldValidity()
else:
validity = FieldValidity()
field_kwargs['validity'] = validity
# 3. GEOMETRY
# ===========
kwargs_geom = {}
kwargs_geom['position_on_horizontal_grid'] = 'center'
# 3.1 identify the structure
keys = copy.copy(list(variable_dimensions.keys()))
for k in only.keys():
if k in keys:
keys.remove(k)
else:
raise ValueError("dimension: " + k + " from 'only' not in field variable.")
if 'T_dimension' in dims_dict_e2n and dims_dict_e2n['T_dimension'] not in only:
keys.remove(dims_dict_e2n['T_dimension'])
squeezed_variables = [dims_dict_n2e.get(k)
for k in keys
if variable_dimensions[k] != 1]
H2D = set(squeezed_variables) == set(['X_dimension',
'Y_dimension']) \
or (set(squeezed_variables) == set(['N_dimension']) and
all([d in all_dimensions_e2n for d in ['X_dimension', # flattened grids
'Y_dimension']])) \
or (set(squeezed_variables) == set(['N_dimension']) \
and behaviour.get('H1D_is_H2D_unstructured', False)) # or 2D unstructured grids
D3 = set(squeezed_variables) == set(['X_dimension',
'Y_dimension',
'Z_dimension']) \
or (set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) and
all([d in all_dimensions_e2n for d in ['X_dimension', # flattened grids
'Y_dimension']])) \
or (set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) \
and behaviour.get('H1D_is_H2D_unstructured', False)) # or 2D unstructured grids
V2D = set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) and not D3
H1D = set(squeezed_variables) == set(['N_dimension']) and not H2D
V1D = set(squeezed_variables) == set(['Z_dimension'])
points = set(squeezed_variables) == set([])
if not any([D3, H2D, V2D, H1D, V1D, points]):
for d in set(variable_dimensions.keys()) - set(dims_dict_n2e.keys()):
epylog.error(" ".join(["dimension:",
d,
"has not been identified as a usual",
"(T,Z,Y,X) dimension. Please specify",
"with readfield() argument",
"adhoc_behaviour={'T_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(T_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"]))
raise epygramError(" ".join(["unable to guess structure of field:",
str(list(variable_dimensions.keys())),
"=> refine behaviour dimensions or",
"filter dimensions with 'only'."]))
elif H1D:
raise NotImplementedError('H1D: not yet.') # TODO:
# 3.2 vertical geometry (default)
default_kwargs_vcoord = {'typeoffirstfixedsurface':255,
'position_on_grid':'mass',
'grid':{'gridlevels': []},
'levels':[0]}
kwargs_vcoord = default_kwargs_vcoord
# 3.3 Specific parts
# 3.3.1 dimensions
dimensions = {}
kwargs_geom['name'] = 'unstructured'
if V2D or H1D:
dimensions['X'] = variable_dimensions[dims_dict_e2n['N_dimension']]
dimensions['Y'] = 1
if V1D or points:
dimensions['X'] = 1
dimensions['Y'] = 1
if D3 or H2D:
if set(['X_dimension', 'Y_dimension']).issubset(set(dims_dict_e2n.keys())):
flattened = False
elif 'N_dimension' in dims_dict_e2n:
flattened = True
else:
raise epygramError('unable to find grid dimensions.')
if not flattened:
dimensions['X'] = variable_dimensions[dims_dict_e2n['X_dimension']]
dimensions['Y'] = variable_dimensions[dims_dict_e2n['Y_dimension']]
else: # flattened
if 'X_dimension' in all_dimensions_e2n \
and 'Y_dimension' in all_dimensions_e2n:
dimensions['X'] = len(self._dimensions[all_dimensions_e2n['X_dimension']])
dimensions['Y'] = len(self._dimensions[all_dimensions_e2n['Y_dimension']])
elif behaviour.get('H1D_is_H2D_unstructured', False):
dimensions['X'] = variable_dimensions[dims_dict_e2n['N_dimension']]
dimensions['Y'] = 1
else:
assert 'X_dimension' in all_dimensions_e2n, \
' '.join(["unable to find X_dimension of field:",
"please specify with readfield() argument",
"adhoc_behaviour={'X_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(X_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"])
assert 'Y_dimension' in all_dimensions_e2n, \
' '.join(["unable to find Y_dimension of field:",
"please specify with readfield() argument",
"adhoc_behaviour={'Y_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(Y_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"])
# 3.3.2 vertical part
if D3 or V1D or V2D:
var_corresponding_to_Z_grid = behaviour.get('Z_grid', dims_dict_e2n['Z_dimension'])
# assert var_corresponding_to_Z_grid in self._variables.keys(), \
# 'unable to find Z_grid in variables.'
levels = None
if var_corresponding_to_Z_grid in self._listfields():
if hasattr(self._variables[var_corresponding_to_Z_grid], 'standard_name') \
and self._variables[var_corresponding_to_Z_grid].standard_name in ('atmosphere_hybrid_sigma_pressure_coordinate',
'atmosphere_hybrid_height_coordinate'):
formula_terms = self._variables[var_corresponding_to_Z_grid].formula_terms.split(' ')
if 'a:' in formula_terms and 'p0:' in formula_terms:
a_name = formula_terms[formula_terms.index('a:') + 1]
p0_name = formula_terms[formula_terms.index('p0:') + 1]
a = self._variables[a_name][:] * self._variables[p0_name][:]
elif 'ap:' in formula_terms:
a_name = formula_terms[formula_terms.index('ap:') + 1]
a = self._variables[a_name][:]
b_name = formula_terms[formula_terms.index('b:') + 1]
b = self._variables[b_name][:]
gridlevels = tuple([(i + 1, FPDict({'Ai':a[i], 'Bi':b[i]})) for i in range(len(a))])
levels = [i + 1 for i in range(variable_dimensions[dims_dict_e2n['Z_dimension']])]
if len(gridlevels) == len(levels):
kwargs_vcoord['grid']['ABgrid_position'] = 'mass'
else:
kwargs_vcoord['grid']['ABgrid_position'] = 'flux'
else:
gridlevels = self._variables[var_corresponding_to_Z_grid][:]
if hasattr(self._variables[behaviour['Z_grid']], 'standard_name'):
kwargs_vcoord['typeoffirstfixedsurface'] = _typeoffirstfixedsurface_dict.get(self._variables[behaviour['Z_grid']].standard_name, 255)
elif hasattr(self._variables[behaviour['Z_grid']], 'short_name'):
kwargs_vcoord['typeoffirstfixedsurface'] = _typeoffirstfixedsurface_short_dict.get(self._variables[behaviour['Z_grid']].short_name, 255)
# TODO: complete the reading of variable units to convert
if hasattr(self._variables[behaviour['Z_grid']], 'units'):
if self._variables[behaviour['Z_grid']].units == 'km':
gridlevels = gridlevels * 1000. # get back to metres
else:
gridlevels = list(range(1, variable_dimensions[dims_dict_e2n['Z_dimension']] + 1))
kwargs_vcoord['typeoffirstfixedsurface'] = 255
kwargs_vcoord['grid']['gridlevels'] = [p for p in gridlevels] # footprints purpose
if levels is None:
kwargs_vcoord['levels'] = kwargs_vcoord['grid']['gridlevels'] # could it be else ?
else:
kwargs_vcoord['levels'] = levels
# 3.3.3 horizontal part
# find grid in variables
geometryclass = UnstructuredGeometry #Default geometry
if H2D or D3:
def find_grid_in_variables():
var_corresponding_to_X_grid = behaviour.get('X_grid', False)
if var_corresponding_to_X_grid not in self._listfields():
epylog.error(" ".join(["unable to find X_grid in variables.",
"Please specify with readfield()",
"argument",
"adhoc_behaviour={'X_grid':'name_of_the_variable'}",
"for instance or",
"my_resource.behave(X_grid='name_of_the_variable')"]))
raise epygramError('unable to find X_grid in variables.')
var_corresponding_to_Y_grid = behaviour.get('Y_grid', False)
if var_corresponding_to_Y_grid not in self._listfields():
epylog.error(" ".join(["unable to find Y_grid in variables.",
"Please specify with readfield()",
"argument",
"adhoc_behaviour={'Y_grid':'name_of_the_variable'}",
"for instance or",
"my_resource.behave(Y_grid='name_of_the_variable')"]))
raise epygramError('unable to find Y_grid in variables.')
else:
if hasattr(self._variables[var_corresponding_to_Y_grid], 'standard_name') and \
self._variables[var_corresponding_to_Y_grid].standard_name == 'projection_y_coordinate' and \
self._variables[var_corresponding_to_X_grid].standard_name == 'projection_x_coordinate':
behaviour['grid_is_lonlat'] = False
elif 'lat' in var_corresponding_to_Y_grid.lower() and \
'lon' in var_corresponding_to_X_grid.lower() and \
'grid_is_lonlat' not in behaviour:
behaviour['grid_is_lonlat'] = True
if len(self._variables[var_corresponding_to_X_grid].dimensions) == 1 and \
len(self._variables[var_corresponding_to_Y_grid].dimensions) == 1:
# case of a flat grid
X = self._variables[var_corresponding_to_X_grid][:]
Y = self._variables[var_corresponding_to_Y_grid][:]
if flattened:
if len(X) == dimensions.get('X') * dimensions.get('Y'):
# case of a H2D field with flattened grid
Xgrid = X.reshape((dimensions['Y'], dimensions['X']))
Ygrid = Y.reshape((dimensions['Y'], dimensions['X']))
elif behaviour.get('H1D_is_H2D_unstructured', False):
# case of a H2D unstructured field
X = self._variables[var_corresponding_to_X_grid][:]
Y = self._variables[var_corresponding_to_Y_grid][:]
Xgrid = X.reshape((1, len(X)))
Ygrid = Y.reshape((1, len(Y)))
else:
raise epygramError('unable to reconstruct 2D grid.')
else:
# case of a regular grid where X is constant on a column
# and Y constant on a row: reconstruct 2D
Xgrid = numpy.ones((Y.size, X.size)) * X
Ygrid = (numpy.ones((Y.size, X.size)).transpose() * Y).transpose()
elif len(self._variables[var_corresponding_to_X_grid].dimensions) == 2 and \
len(self._variables[var_corresponding_to_Y_grid].dimensions) == 2:
Xgrid = self._variables[var_corresponding_to_X_grid][:, :]
Ygrid = self._variables[var_corresponding_to_Y_grid][:, :]
elif len(self._variables[var_corresponding_to_X_grid].dimensions) == 3 and \
len(self._variables[var_corresponding_to_Y_grid].dimensions) == 3:
# In this case, we check that X and Y are constant on Z axis
Xgrid = self._variables[var_corresponding_to_X_grid][:, :, :]
Ygrid = self._variables[var_corresponding_to_Y_grid][:, :, :]
if not all([numpy.all(Xgrid[0] == Xgrid[i] for i in range(len(Xgrid)))]):
raise epygramError('X coordinate must be constant on the vertical')
if not all([numpy.all(Ygrid[0] == Ygrid[i] for i in range(len(Ygrid)))]):
raise epygramError('Y coordinate must be constant on the vertical')
Xgrid = Xgrid[0]
Ygrid = Ygrid[0]
else:
raise epygramError('Unknown case for X and Y dimensions')
if Ygrid[0, 0] > Ygrid[-1, 0] and not behaviour.get('reverse_Yaxis'):
epylog.warning("Ygrid seems to be reversed; shouldn't behaviour['reverse_Yaxis'] be True ?")
elif behaviour.get('reverse_Yaxis'):
Ygrid = Ygrid[::-1, :]
Xgrid = Xgrid[::-1, :]
return Xgrid, Ygrid
# projection or grid
def grid_mapping_ok():
ok = False
if hasattr(variable, 'grid_mapping'):
if variable.grid_mapping in self._variables:
if hasattr(self._variables[variable.grid_mapping], 'grid_mapping_name'):
gmm = self._variables[variable.grid_mapping].grid_mapping_name
if gmm in ('lambert_conformal_conic',
'mercator',
'polar_stereographic',
'latitude_longitude') or 'gauss' in gmm.lower():
ok = True
elif variable.grid_mapping == 'GeosCoordinateSystem':
if 'geos' in self._variables:
if hasattr(self._variables['geos'], 'grid_mapping_name'):
gmm = self._variables['geos'].grid_mapping_name
if gmm == 'vertical_perspective':
ok = True
return ok
if grid_mapping_ok():
# geometry described as "grid_mapping" meta-data
if (variable.grid_mapping == 'GeosCoordinateSystem' and
'geos' in self._variables
and hasattr(self._variables['geos'], 'grid_mapping_name')
and self._variables['geos'].grid_mapping_name == 'vertical_perspective'):
gm = 'geos'
else:
gm = variable.grid_mapping
grid_mapping = self._variables[gm]
if hasattr(grid_mapping, 'ellipsoid'):
kwargs_geom['geoid'] = {'ellps':grid_mapping.ellipsoid}
elif hasattr(grid_mapping, 'earth_radius'):
kwargs_geom['geoid'] = {'a':grid_mapping.earth_radius,
'b':grid_mapping.earth_radius}
elif hasattr(grid_mapping, 'semi_major_axis') and hasattr(grid_mapping, 'semi_minor_axis'):
kwargs_geom['geoid'] = {'a':grid_mapping.semi_major_axis,
'b':grid_mapping.semi_minor_axis}
elif hasattr(grid_mapping, 'semi_major_axis') and hasattr(grid_mapping, 'inverse_flattening'):
kwargs_geom['geoid'] = {'a':grid_mapping.semi_major_axis,
'rf':grid_mapping.inverse_flattening}
else:
kwargs_geom['geoid'] = config.default_geoid
if hasattr(grid_mapping, 'position_on_horizontal_grid'):
kwargs_geom['position_on_horizontal_grid'] = grid_mapping.position_on_horizontal_grid
if grid_mapping.grid_mapping_name in ('lambert_conformal_conic',
'mercator',
'polar_stereographic',
'vertical_perspective'):
if (hasattr(grid_mapping, 'x_resolution') or
not behaviour.get('grid_is_lonlat', False)):
# if resolution is either in grid_mapping attributes or in the grid itself
geometryclass = ProjectedGeometry
kwargs_geom['name'] = _proj_dict_inv[grid_mapping.grid_mapping_name]
if hasattr(grid_mapping, 'x_resolution'):
Xresolution = grid_mapping.x_resolution
Yresolution = grid_mapping.y_resolution
else:
Xgrid, Ygrid = find_grid_in_variables()
if 1 in Xgrid.shape and behaviour.get('H1D_is_H2D_unstructured', False):
raise epygramError('unable to retrieve both X_resolution and Y_resolution from a 1D list of points.')
else:
Xresolution = abs(Xgrid[0, 0] - Xgrid[0, 1])
Yresolution = abs(Ygrid[0, 0] - Ygrid[1, 0])
grid = {'X_resolution':Xresolution,
'Y_resolution':Yresolution,
'LAMzone':None}
import pyproj
if kwargs_geom['name'] == 'lambert':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.longitude_of_central_meridian, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
if isinstance(grid_mapping.standard_parallel, numpy.ndarray):
s1, s2 = grid_mapping.standard_parallel
kwargs_geom['projection']['secant_lat1'] = Angle(s1, 'degrees')
kwargs_geom['projection']['secant_lat2'] = Angle(s2, 'degrees')
else:
r = grid_mapping.standard_parallel
kwargs_geom['projection']['reference_lat'] = Angle(r, 'degrees')
s1 = s2 = r
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
reference_lat = grid_mapping.latitude_of_projection_origin
# compute x_0, y_0...
p = pyproj.Proj(proj='lcc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_1=s1, lat_2=s2,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
reference_lat)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='lcc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_1=s1, lat_2=s2,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
elif kwargs_geom['name'] == 'mercator':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.longitude_of_central_meridian, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
kwargs_geom['projection']['reference_lat'] = Angle(0., 'degrees')
if grid_mapping.standard_parallel != 0.:
lat_ts = grid_mapping.standard_parallel
kwargs_geom['projection']['secant_lat'] = Angle(lat_ts, 'degrees')
else:
lat_ts = 0.
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
# compute x_0, y_0...
p = pyproj.Proj(proj='merc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_ts=lat_ts,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
0.)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='merc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_ts=lat_ts,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
elif kwargs_geom['name'] == 'polar_stereographic':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.straight_vertical_longitude_from_pole, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
kwargs_geom['projection']['reference_lat'] = Angle(grid_mapping.latitude_of_projection_origin, 'degrees')
# CF-1.4
if hasattr(grid_mapping, 'scale_factor_at_projection_origin'):
if grid_mapping.scale_factor_at_projection_origin == 1.:
lat_ts = grid_mapping.latitude_of_projection_origin
else:
raise NotImplementedError("CF-1.4 encoding of secant parallel using 'scale_factor_at_projection_origin'.")
# CF-1.6
elif hasattr(grid_mapping, 'standard_parallel'):
lat_ts = grid_mapping.standard_parallel
if grid_mapping.standard_parallel != grid_mapping.latitude_of_projection_origin:
kwargs_geom['projection']['secant_lat'] = Angle(lat_ts, 'degrees')
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
# compute x_0, y_0...
p = pyproj.Proj(proj='stere',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_0=kwargs_geom['projection']['reference_lat'].get('degrees'),
lat_ts=lat_ts,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
kwargs_geom['projection']['reference_lat'].get('degrees'),)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='stere',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_0=kwargs_geom['projection']['reference_lat'].get('degrees'),
lat_ts=lat_ts,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
elif kwargs_geom['name'] == 'space_view':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.longitude_of_central_meridian, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
r = grid_mapping.latitude_of_projection_origin
kwargs_geom['projection']['satellite_lat'] = Angle(r, 'degrees')
kwargs_geom['projection']['satellite_lon'] = Angle(grid_mapping.longitude_of_central_meridian, 'degrees')
kwargs_geom['projection']['satellite_height'] = grid_mapping.height_above_earth
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
reference_lat = grid_mapping.standard_parallel
# compute x_0, y_0...
p = pyproj.Proj(proj='geos',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
h=kwargs_geom['projection']['satellite_height'],
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
reference_lat)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='geos',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
h=kwargs_geom['projection']['satellite_height'],
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
else:
# no resolution available: grid mapping is useless
gm = None
elif 'gauss' in grid_mapping.grid_mapping_name.lower():
if hasattr(grid_mapping, 'latitudes'):
latitudes = self._variables[grid_mapping.latitudes.split(' ')[1]][:]
else:
# NOTE: this is a (good) approximation actually, the true latitudes are the roots of Legendre polynoms
raise NotImplementedError('(re-)computation of Gauss latitudes (not in file metadata).')
grid = {'latitudes':FPList([Angle(l, 'degrees') for l in latitudes]),
'dilatation_coef':1.}
if hasattr(grid_mapping, 'lon_number_by_lat'):
geometryclass = GaussGeometry
if isinstance(grid_mapping.lon_number_by_lat, six.string_types):
lon_number_by_lat = self._variables[grid_mapping.lon_number_by_lat.split(' ')[1]][:]
else:
kwargs_geom['name'] = 'regular_gauss'
lon_number_by_lat = [dimensions['X'] for _ in range(dimensions['Y'])]
if hasattr(grid_mapping, 'pole_lon'):
kwargs_geom['name'] = 'rotated_reduced_gauss'
grid['pole_lon'] = Angle(grid_mapping.pole_lon, 'degrees')
grid['pole_lat'] = Angle(grid_mapping.pole_lat, 'degrees')
if hasattr(grid_mapping, 'dilatation_coef'):
grid['dilatation_coef'] = grid_mapping.dilatation_coef
else:
kwargs_geom['name'] = 'reduced_gauss'
dimensions = {'max_lon_number':int(max(lon_number_by_lat)),
'lat_number':len(latitudes),
'lon_number_by_lat':FPList([int(n) for n in
lon_number_by_lat])}
elif grid_mapping.grid_mapping_name == 'latitude_longitude':
# try to find out longitude, latitude arrays
for f in config.netCDF_usualnames_for_lonlat_grids['X']:
if f in self.listfields():
behaviour['X_grid'] = f
break
for f in config.netCDF_usualnames_for_lonlat_grids['Y']:
if f in self.listfields():
behaviour['Y_grid'] = f
break
Xgrid, Ygrid = find_grid_in_variables()
grid = {'longitudes':Xgrid,
'latitudes':Ygrid}
if ((hasattr(self._variables[variable.grid_mapping],
'x_resolution') and
hasattr(self._variables[variable.grid_mapping],
'y_resolution') or
hasattr(self._variables[variable.grid_mapping],
'resolution')) and
len(Xgrid.shape) == 2):
# then this is a regular lon lat
geometryclass = RegLLGeometry
kwargs_geom['name'] = 'regular_lonlat'
if hasattr(self._variables[variable.grid_mapping],
'x_resolution'):
x_res = grid_mapping.x_resolution
else:
x_res = grid_mapping.resolution
if hasattr(self._variables[variable.grid_mapping],
'y_resolution'):
y_res = grid_mapping.y_resolution
else:
y_res = grid_mapping.resolution
grid = {'input_lon':Angle(Xgrid[0, 0], 'degrees'),
'input_lat':Angle(Ygrid[0, 0], 'degrees'),
'input_position':(0, 0),
'X_resolution':Angle(x_res, 'degrees'),
'Y_resolution':Angle(y_res, 'degrees')}
else:
raise NotImplementedError('grid_mapping.grid_mapping_name == ' +
grid_mapping.grid_mapping_name)
else:
if hasattr(variable, 'grid_mapping'):
epylog.info('grid_mapping ignored: unknown case')
# grid only in variables
Xgrid, Ygrid = find_grid_in_variables()
if behaviour.get('grid_is_lonlat', False):
regll = False
if len(Xgrid.shape) == 2 and len(Ygrid.shape) == 2:
loose_epsilon = 1e-12 # config.epsilon was potentially too narrow
x_res = Xgrid[:,1:] - Xgrid[:,:-1]
y_res = Ygrid[1:,:] - Ygrid[:-1,:]
if Xgrid.shape[0] == 1: # dim 1 on Y
y_res = x_res
elif Ygrid.shape[1] == 1: # dim 1 on X
x_res = y_res
x_regular = numpy.all(x_res - x_res[0, 0] <= loose_epsilon)
y_regular = numpy.all(y_res - y_res[0, 0] <= loose_epsilon)
regll = x_regular and y_regular
if regll:
geometryclass = RegLLGeometry
kwargs_geom['name'] = 'regular_lonlat'
grid = {'input_lon': Angle(Xgrid[0, 0], 'degrees'),
'input_lat': Angle(Ygrid[0, 0], 'degrees'),
'input_position': (0, 0),
'X_resolution': Angle(x_res[0, 0], 'degrees'),
'Y_resolution': Angle(y_res[0, 0], 'degrees')}
else:
grid = {'longitudes':Xgrid,
'latitudes':Ygrid}
else:
# grid is not lon/lat and no other metadata available : Academic
geometryclass = AcademicGeometry
kwargs_geom['name'] = 'academic'
grid = {'LAMzone':None,
'X_resolution':abs(Xgrid[0, 1] - Xgrid[0, 0]),
'Y_resolution':abs(Ygrid[1, 0] - Ygrid[0, 0]),
'input_lat':1.,
'input_lon':1.,
'input_position':(0, 0)}
kwargs_geom['projection'] = {'reference_dX':grid['X_resolution'],
'reference_dY':grid['Y_resolution'],
'rotation': Angle(0, 'degrees')}
elif V1D or V2D or points:
var_corresponding_to_X_grid = behaviour.get('X_grid', False)
if var_corresponding_to_X_grid not in self._listfields():
if points or V1D:
lon = ['_']
else:
raise epygramError('unable to find X_grid in variables.')
else:
lon = self._variables[var_corresponding_to_X_grid][:]
var_corresponding_to_Y_grid = behaviour.get('Y_grid', False)
if var_corresponding_to_Y_grid not in self._listfields():
if points or V1D:
lat = ['_']
else:
raise epygramError('unable to find Y_grid in variables.')
else:
lat = self._variables[var_corresponding_to_Y_grid][:]
grid = {'longitudes':lon,
'latitudes':lat}
# 3.4 build geometry
vcoordinate = VGeometry(**kwargs_vcoord)
kwargs_geom['grid'] = grid
kwargs_geom['dimensions'] = dimensions
kwargs_geom['vcoordinate'] = vcoordinate
geometry = geometryclass(**kwargs_geom)
# 4. build field
field_kwargs['geometry'] = geometry
field_kwargs['structure'] = geometry.structure
comment = {}
for a in variable.ncattrs():
if a != 'validity':
# CLEANME: all these conversions might be useless since the use of _default_numpy2json ?
if isinstance(variable.getncattr(a), numpy.float32): # pb with json and float32
comment.update({a:numpy.float64(variable.getncattr(a))})
elif isinstance(variable.getncattr(a), numpy.int32): # pb with json and int32
comment.update({a:numpy.int64(variable.getncattr(a))})
elif isinstance(variable.getncattr(a), numpy.ndarray): # pb with json and numpy arrays
comment.update({a:numpy.float64(variable.getncattr(a)).tolist()})
elif (isinstance(variable.getncattr(a), numpy.int16) or
isinstance(variable.getncattr(a), numpy.uint16)): # pb with json and int16
comment.update({a:numpy.int64(variable.getncattr(a))})
elif isinstance(variable.getncattr(a), numpy.int8): # pb with json and int8
comment.update({a:numpy.int64(variable.getncattr(a))})
else:
comment.update({a:variable.getncattr(a)})
comment = json.dumps(comment, default=_default_numpy2json)
if comment != '{}':
field_kwargs['comment'] = comment
field = fpx.field(**field_kwargs)
if getdata:
if only:
n = len(variable.dimensions)
buffdata = variable
for k, i in only.items():
d = variable.dimensions.index(k)
if isinstance(i, slice):
ibuff = [util.restrain_to_index_i_of_dim_d(buffdata, j, d, n=n) for j in
range(i.start, i.stop, i.step)]
buffdata = numpy.concatenate(ibuff, axis=d)
else:
buffdata = util.restrain_to_index_i_of_dim_d(buffdata, i, d, n=n)
else:
buffdata = variable[...]
# check there is no leftover unknown dimension
field_dim_num = 1 if len(field.validity) > 1 else 0
if field.structure != 'Point':
field_dim_num += [int(c) for c in field.structure if c.isdigit()][0]
if (H2D or D3) and flattened:
field_dim_num -= 1
assert field_dim_num == len(buffdata.squeeze().shape), \
' '.join(['shape of field and identified usual dimensions',
'do not match: use *only* to filter or',
'*adhoc_behaviour* to identify dimensions'])
# re-shuffle to have data indexes in order (t,z,y,x)
positions = []
shp4D = [1, 1, 1, 1]
if 'T_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['T_dimension'])
positions.append(idx)
shp4D[0] = buffdata.shape[idx]
if 'Z_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['Z_dimension'])
positions.append(idx)
shp4D[1] = buffdata.shape[idx]
if 'Y_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['Y_dimension'])
positions.append(idx)
shp4D[2] = buffdata.shape[idx]
if 'X_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['X_dimension'])
positions.append(idx)
shp4D[3] = buffdata.shape[idx]
elif 'N_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['N_dimension'])
positions.append(idx)
shp4D[3] = buffdata.shape[idx]
for d in variable.dimensions:
# whatever the order of these, they must have been filtered and dimension 1 (only)
if d not in dims_dict_e2n.values():
positions.append(variable.dimensions.index(d))
shp4D = tuple(shp4D)
buffdata = buffdata.transpose(*positions).squeeze()
if isinstance(buffdata, numpy.ma.masked_array):
data = numpy.ma.zeros(shp4D)
else:
data = numpy.empty(shp4D)
if (H2D or D3) and flattened:
if len(buffdata.shape) == 2:
if D3:
first_dimension = 'Z'
else:
first_dimension = 'T'
else:
first_dimension = None
data = geometry.reshape_data(buffdata, first_dimension=first_dimension, d4=True)
else:
data[...] = buffdata.reshape(data.shape)
if behaviour.get('reverse_Yaxis'):
data[...] = data[:, :, ::-1, :]
field.setdata(data)
return field
[docs] def writefield(self, field,
compression=4,
metadata=None,
adhoc_behaviour=None,
fill_value=config.netCDF_default_variables_fill_value):
"""
Write a field in resource.
:param field: the :class:`~epygram.base.Field` object to write
:param compression ranges from 1 (low compression, fast writing)
to 9 (high compression, slow writing). 0 is no compression.
:param metadata: dict, can be filled by any meta-data, that will be stored
as attribute of the netCDF variable.
:param adhoc_behaviour: to specify "on the fly" a behaviour (usual
dimensions or grids, ...).
"""
metadata = util.ifNone_emptydict(metadata)
adhoc_behaviour = util.ifNone_emptydict(adhoc_behaviour)
behaviour = self.behaviour.copy()
behaviour.update(adhoc_behaviour)
def check_or_add_dim(d, d_in_field=None, size=None):
if size is None:
if d_in_field is None:
d_in_field = d
size = field.geometry.dimensions[d_in_field]
if d not in self._dimensions:
self._nc.createDimension(d, size=size)
else:
assert len(self._dimensions[d]) == size, \
"dimensions mismatch: " + d + ": " + \
str(self._dimensions[d]) + " != " + str(size)
def check_or_add_variable(varname, vartype,
dimensions=(),
**kwargs):
if six.text_type(varname) not in self._listfields():
var = self._nc.createVariable(varname, vartype,
dimensions=dimensions,
**kwargs)
status = 'created'
else:
assert self._variables[varname].dtype == vartype, \
' '.join(['variable', varname,
'already exist with other type:',
str(self._variables[varname].dtype)])
if isinstance(dimensions, six.string_types):
dimensions = (dimensions,)
assert self._variables[varname].dimensions == tuple(dimensions), \
' '.join(['variable', varname,
'already exist with other dimensions:',
str(self._variables[varname].dimensions)])
var = self._variables[varname]
status = 'match'
return var, status
assert 'netCDF' in field.fid
assert not field.spectral
# 1. dimensions
T = Y = X = G = N = None
dims = []
# time
if len(field.validity) > 1 or behaviour.get('force_a_T_dimension', False):
# default
T = config.netCDF_usualnames_for_standard_dimensions['T_dimension'][0]
# or any existing identified time dimension
T = {'found':v for v in self._dimensions
if (v in config.netCDF_usualnames_for_standard_dimensions['T_dimension'] and
len(self._dimensions[v]) == len(field.validity))}.get('found', T)
# or specified behaviour
T = behaviour.get('T_dimension', T)
check_or_add_dim(T, size=len(field.validity))
# vertical part
# default
Z = config.netCDF_usualnames_for_standard_dimensions['Z_dimension'][0]
# or any existing identified time dimension
Z = {'found':v for v in self._dimensions
if (v in config.netCDF_usualnames_for_standard_dimensions['Z_dimension'] and
len(self._dimensions[v]) == len(field.geometry.vcoordinate.levels))}.get('found', Z)
# or specified behaviour
Z = behaviour.get('Z_dimension', Z)
if 'gridlevels' in field.geometry.vcoordinate.grid:
Z_gridsize = max(len(field.geometry.vcoordinate.grid['gridlevels']), 1)
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119) and \
field.geometry.vcoordinate.grid.get('ABgrid_position') != \
field.geometry.vcoordinate.position_on_grid:
Z_gridsize -= 1
else:
Z_gridsize = len(field.geometry.vcoordinate.levels) # 1
if Z_gridsize > 1:
check_or_add_dim(Z, size=Z_gridsize)
# horizontal
if behaviour.get('flatten_horizontal_grids', False):
_gpn = field.geometry.gridpoints_number
G = 'gridpoints_number'
check_or_add_dim(G, size=_gpn)
if field.geometry.rectangular_grid:
if field.geometry.dimensions['Y'] > 1 and field.geometry.dimensions['X'] > 1:
Y = behaviour.get('Y_dimension',
config.netCDF_usualnames_for_standard_dimensions['Y_dimension'][0])
check_or_add_dim(Y, d_in_field='Y')
X = behaviour.get('X_dimension',
config.netCDF_usualnames_for_standard_dimensions['X_dimension'][0])
check_or_add_dim(X, d_in_field='X')
elif field.geometry.dimensions['X'] > 1:
N = behaviour.get('N_dimension',
config.netCDF_usualnames_for_standard_dimensions['N_dimension'][0])
check_or_add_dim(N, d_in_field='X')
elif 'gauss' in field.geometry.name:
Y = behaviour.get('Y_dimension', 'latitude')
check_or_add_dim(Y, d_in_field='lat_number')
X = behaviour.get('X_dimension', 'longitude')
check_or_add_dim(X, d_in_field='max_lon_number')
else:
raise NotImplementedError("grid not rectangular nor a gauss one.")
# 2. validity
# TODO: deal with unlimited time dimension ?
if field.validity[0] != FieldValidity():
tgrid = config.netCDF_usualnames_for_standard_dimensions['T_dimension'][0]
tgrid = {'found':v for v in self._variables
if v in config.netCDF_usualnames_for_standard_dimensions['T_dimension']}.get('found', tgrid)
tgrid = behaviour.get('T_grid', tgrid)
if len(field.validity) > 1 or behaviour.get('force_a_T_dimension', False):
_, _status = check_or_add_variable(tgrid, float, T)
dims.append(T) # FIXME: T instead of tgrid ?
else:
_, _status = check_or_add_variable(tgrid, float)
datetime0 = field.validity[0].getbasis().isoformat(sep=str(' '))
datetimes = [int((dt.get() - field.validity[0].getbasis()).total_seconds()) for dt in field.validity]
if _status == 'created':
self._variables[tgrid][:] = datetimes
self._variables[tgrid].units = ' '.join(['seconds', 'since', datetime0])
else:
units = self._variables[tgrid].units.split(sep=' ')
units_base = datetime.datetime.strptime(' '.join(units[2:4]), '%Y-%m-%d %H:%M:%S')
if field.validity[0].getbasis() != units_base:
basediff = field.validity[0].getbasis() - units_base
datetimes = [dt + int(basediff.total_seconds()) for dt in datetimes]
assert (self._variables[tgrid][:] == datetimes).all(), \
' '.join(['variable', tgrid, 'mismatch.'])
# 3. geometry
# 3.1 vertical part
if len(field.geometry.vcoordinate.levels) > 1:
dims.append(Z)
if Z_gridsize > 1:
zgridname = config.netCDF_usualnames_for_standard_dimensions['Z_dimension'][0]
zgridname = {'found':v for v in self._variables
if v in config.netCDF_usualnames_for_standard_dimensions['Z_dimension']}.get('found', zgridname)
zgridname = behaviour.get('Z_grid', zgridname)
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
if field.geometry.vcoordinate.grid.get('ABgrid_position') == 'mass' and \
field.geometry.vcoordinate.position_on_grid == 'mass':
ZP1 = Z
check_or_add_dim(ZP1, size=Z_gridsize)
else:
ZP1 = Z + '+1'
check_or_add_dim(ZP1, size=Z_gridsize + 1)
zgrid, _status = check_or_add_variable(zgridname, int)
if _status == 'created':
zgrid.standard_name = _typeoffirstfixedsurface_dict_inv[field.geometry.vcoordinate.typeoffirstfixedsurface]
if field.geometry.vcoordinate.typeoffirstfixedsurface == 119:
zgrid.positive = "down"
zgrid.formula_terms = "ap: hybrid_coef_A b: hybrid_coef_B ps: surface_air_pressure"
check_or_add_variable('hybrid_coef_A',
behaviour.get('meta_var_type',
config.netCDF_default_metavariables_dtype),
ZP1)
self._variables['hybrid_coef_A'][:] = [iab[1]['Ai'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
check_or_add_variable('hybrid_coef_B',
behaviour.get('meta_var_type',
config.netCDF_default_metavariables_dtype),
ZP1)
self._variables['hybrid_coef_B'][:] = [iab[1]['Bi'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
elif field.geometry.vcoordinate.typeoffirstfixedsurface == 118:
# TOBECHECKED:
zgrid.positive = "up"
zgrid.formula_terms = "a: hybrid_coef_A b: hybrid_coef_B orog: orography"
check_or_add_variable('hybrid_coef_A',
behaviour.get('meta_var_type',
config.netCDF_default_metavariables_dtype),
ZP1)
self._variables['hybrid_coef_A'][:] = [iab[1]['Ai'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
check_or_add_variable('hybrid_coef_B',
behaviour.get('meta_var_type',
config.netCDF_default_metavariables_dtype),
ZP1)
self._variables['hybrid_coef_B'][:] = [iab[1]['Bi'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
else:
epylog.info('assume 118/119 type vertical grid matches.')
else:
if len(numpy.shape(field.geometry.vcoordinate.levels)) > 1:
dims_Z = [d for d in [Z, Y, X, G, N] if d is not None]
else:
dims_Z = Z
zgrid, _status = check_or_add_variable(zgridname,
behaviour.get('vartype',
config.netCDF_default_variables_dtype),
dims_Z)
u = {102:'m', 103:'m', 100:'hPa'}.get(field.geometry.vcoordinate.typeoffirstfixedsurface, None)
if u is not None:
zgrid.units = u
if _status == 'created':
zgrid[:] = field.geometry.vcoordinate.levels
else:
assert zgrid[:].all() == numpy.array(field.geometry.vcoordinate.levels).all(), \
' '.join(['variable', zgrid, 'mismatch.'])
if _typeoffirstfixedsurface_short_dict_inv.get(field.geometry.vcoordinate.typeoffirstfixedsurface, False):
zgrid.short_name = _typeoffirstfixedsurface_short_dict_inv[field.geometry.vcoordinate.typeoffirstfixedsurface]
# 3.2 grid (lonlat)
dims_lonlat = []
(lons, lats) = field.geometry.get_lonlat_grid()
if behaviour.get('flatten_horizontal_grids'):
dims_lonlat.append(G)
dims.append(G)
lons = stretch_array(lons)
lats = stretch_array(lats)
elif field.geometry.dimensions.get('Y', field.geometry.dimensions.get('lat_number', 0)) > 1: # both Y and X dimensions
dims_lonlat.extend([Y, X])
dims.extend([Y, X])
elif field.geometry.dimensions['X'] > 1: # only X ==> N
dims_lonlat.append(N)
dims.append(N)
# else: pass (single point or profile)
if isinstance(lons, numpy.ma.masked_array):
lonlat_fill_value = fill_value
lons = lons.filled(lonlat_fill_value)
lats = lats.filled(lonlat_fill_value)
else:
lonlat_fill_value = None
try:
_ = float(stretch_array(lons)[0])
except ValueError:
write_lonlat_grid = False
else:
write_lonlat_grid = behaviour.get('write_lonlat_grid', True)
if write_lonlat_grid:
lons_var, _status = check_or_add_variable(behaviour.get('X_grid', 'longitude'),
behaviour.get('vartype',
config.netCDF_default_variables_dtype),
dims_lonlat,
fill_value=lonlat_fill_value)
lats_var, _status = check_or_add_variable(behaviour.get('Y_grid', 'latitude'),
behaviour.get('vartype',
config.netCDF_default_variables_dtype),
dims_lonlat,
fill_value=lonlat_fill_value)
if _status == 'match':
epylog.info('assume lons/lats match.')
else:
lons_var[...] = lons[...]
lons_var.units = 'degrees'
lats_var[...] = lats[...]
lats_var.units = 'degrees'
# 3.3 meta-data
def set_ellipsoid(meta):
if 'ellps' in field.geometry.geoid:
self._variables[meta].ellipsoid = field.geometry.geoid['ellps']
elif field.geometry.geoid.get('a', False) == field.geometry.geoid.get('b', True):
self._variables[meta].earth_radius = field.geometry.geoid['a']
elif field.geometry.geoid.get('a', False) and field.geometry.geoid.get('b', False):
self._variables[meta].semi_major_axis = field.geometry.geoid['a']
self._variables[meta].semi_minor_axis = field.geometry.geoid['b']
elif field.geometry.geoid.get('a', False) and field.geometry.geoid.get('rf', False):
self._variables[meta].semi_major_axis = field.geometry.geoid['a']
self._variables[meta].inverse_flattening = field.geometry.geoid['rf']
else:
raise NotImplementedError('this kind of geoid:' + str(field.geometry.geoid))
if field.geometry.dimensions.get('Y', field.geometry.dimensions.get('lat_number', 0)) > 1:
if 'gauss' in field.geometry.name:
# reduced Gauss case
meta = 'Gauss_grid'
_, _status = check_or_add_variable(meta, int)
if _status == 'created':
self._variables[meta].grid_mapping_name = field.geometry.name + "_grid"
set_ellipsoid(meta)
if 'reduced' in field.geometry.name:
self._variables[meta].lon_number_by_lat = 'var: lon_number_by_lat'
check_or_add_variable('lon_number_by_lat', int, Y)
self._variables['lon_number_by_lat'][:] = field.geometry.dimensions['lon_number_by_lat']
self._variables[meta].latitudes = 'var: gauss_latitudes'
check_or_add_variable('gauss_latitudes', float, Y)
self._variables['gauss_latitudes'][:] = [l.get('degrees') for l in field.geometry.grid['latitudes']]
if 'pole_lon' in field.geometry.grid:
self._variables[meta].pole_lon = field.geometry.grid['pole_lon'].get('degrees')
self._variables[meta].pole_lat = field.geometry.grid['pole_lat'].get('degrees')
if 'dilatation_coef' in field.geometry.grid:
self._variables[meta].dilatation_coef = field.geometry.grid['dilatation_coef']
else:
epylog.info('assume Gauss grid parameters match.')
elif field.geometry.projected_geometry:
# projections
if field.geometry.name in ('lambert', 'mercator', 'polar_stereographic'):
meta = 'Projection_parameters'
_, _status = check_or_add_variable(meta, int)
if _status == 'created':
self._variables[meta].grid_mapping_name = _proj_dict[field.geometry.name]
set_ellipsoid(meta)
if field.geometry.position_on_horizontal_grid != 'center':
self._variables[meta].position_on_horizontal_grid = field.geometry.position_on_horizontal_grid
self._variables[meta].x_resolution = field.geometry.grid['X_resolution']
self._variables[meta].y_resolution = field.geometry.grid['Y_resolution']
if field.geometry.grid.get('LAMzone'):
_lon_cen, _lat_cen = field.geometry.ij2ll(float(field.geometry.dimensions['X'] - 1.) / 2.,
float(field.geometry.dimensions['Y'] - 1.) / 2.)
else:
_lon_cen = field.geometry._center_lon.get('degrees')
_lat_cen = field.geometry._center_lat.get('degrees')
if field.geometry.name == 'lambert':
if field.geometry.secant_projection:
std_parallel = [field.geometry.projection['secant_lat1'].get('degrees'),
field.geometry.projection['secant_lat2'].get('degrees')]
latitude_of_projection_origin = (std_parallel[0] + std_parallel[1]) / 2.
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
latitude_of_projection_origin = std_parallel
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'),
latitude_of_projection_origin)
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].longitude_of_central_meridian = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].latitude_of_projection_origin = latitude_of_projection_origin
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
elif field.geometry.name == 'mercator':
if field.geometry.secant_projection:
std_parallel = field.geometry.projection['secant_lat'].get('degrees')
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'), 0.)
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].longitude_of_central_meridian = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
elif field.geometry.name == 'polar_stereographic':
if field.geometry.secant_projection:
std_parallel = field.geometry.projection['secant_lat'].get('degrees')
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'),
field.geometry.projection['reference_lat'].get('degrees'))
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].straight_vertical_longitude_from_pole = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].latitude_of_projection_origin = field.geometry.projection['reference_lat'].get('degrees')
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
else:
epylog.info('assume projection parameters match.')
else:
raise NotImplementedError('field.geometry.name == ' + field.geometry.name)
else:
meta = False
else:
meta = False
# 4. Variable
varname = field.fid['netCDF'].replace('.', config.netCDF_replace_dot_in_variable_names)
_, _status = check_or_add_variable(varname,
behaviour.get('vartype',
config.netCDF_default_variables_dtype),
dims,
zlib=bool(compression),
complevel=compression,
fill_value=fill_value)
if meta:
self._variables[varname].grid_mapping = meta
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
self._variables[varname].vertical_grid = zgridname
data = field.getdata(d4=True)
if isinstance(data, numpy.ma.masked_array):
if 'gauss' in field.geometry.name:
data = field.geometry.fill_maskedvalues(data)
else:
data = data.filled(fill_value)
if behaviour.get('flatten_horizontal_grids'):
data = field.geometry.horizontally_flattened(data)
data = data.squeeze()
if _status == 'match':
epylog.info('overwrite data in variable ' + varname)
self._variables[varname][...] = data
if field.units not in (None, ''):
self._variables[varname].units = field.units
# 5. metadata
for k, v in metadata.items():
self._variables[varname].setncattr(k, v)
[docs] def set_default_global_attributes(self):
"""
Set default global attributes (those from
config.netCDF_default_global_attributes).
"""
self._nc.setncatts(config.netCDF_default_global_attributes)
[docs] def set_global_attributes(self, **attributes):
"""Set the given global attributes."""
self._nc.setncatts(attributes)
[docs] def behave(self, **kwargs):
"""
Set-up the given arguments in self.behaviour, for the purpose of
building fields from netCDF.
"""
self.behaviour.update(kwargs)
# adapted from http://schubert.atmos.colostate.edu/~cslocum/netcdf_example.html
[docs] def ncdump(self, out):
"""
Outputs dimensions, variables and their attribute information.
The information is similar to that of NCAR's ncdump utility.
:param out: IO where nc_attrs, nc_dims, and nc_vars are printed
Returns
-------
nc_attrs : list
A Python list of the NetCDF file global attributes
nc_dims : list
A Python list of the NetCDF file dimensions
nc_vars : list
A Python list of the NetCDF file variables
"""
nc = self._nc
def outwrite(*items):
items = list(items)
stritem = items.pop(0)
for i in items:
stritem += ' ' + str(i)
out.write(stritem + '\n')
def print_ncattr(key):
"""
Prints the NetCDF file attributes for a given key
Parameters
----------
key : unicode
a valid netCDF4.Dataset.variables key
"""
try:
outwrite("\t\ttype:", repr(nc.variables[key].dtype))
for ncattr in nc.variables[key].ncattrs():
outwrite('\t\t%s:' % ncattr,
repr(nc.variables[key].getncattr(ncattr)))
except KeyError:
outwrite("\t\tWARNING: %s does not contain variable attributes" % key)
# NetCDF global attributes
nc_attrs = nc.ncattrs()
outwrite("NetCDF Global Attributes:")
for nc_attr in nc_attrs:
outwrite('\t%s:' % nc_attr, repr(nc.getncattr(nc_attr)))
nc_dims = [dim for dim in nc.dimensions] # list of nc dimensions
# Dimension shape information.
outwrite("NetCDF dimension information:")
for dim in nc_dims:
outwrite("\tName:", dim)
outwrite("\t\tsize:", len(nc.dimensions[dim]))
# print_ncattr(dim)
# Variable information.
nc_vars = [var for var in nc.variables] # list of nc variables
outwrite("NetCDF variable information:")
for var in nc_vars:
outwrite('\tName:', var)
outwrite("\t\tdimensions:", nc.variables[var].dimensions)
outwrite("\t\tsize:", nc.variables[var].size)
print_ncattr(var)
return nc_attrs, nc_dims, nc_vars
[docs] def what(self, out=sys.stdout, **_):
"""Writes in file a summary of the contents of the GRIB."""
out.write("### FORMAT: " + self.format + "\n")
out.write("\n")
self.ncdump(out)
def _listfields(self):
"""Returns the fid list of the fields inside the resource."""
return list(self._variables.keys())
@property
@FileResource._openbeforedelayed
def _dimensions(self):
return self._nc.dimensions
@property
@FileResource._openbeforedelayed
def _variables(self):
return self._nc.variables