#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class for DDH-in-LFA format.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import math
import numpy
import datetime
import sys
import footprints
from footprints import FPDict, FPList
from bronx.meteo.constants import g0
from epygram.extra.falfilfa4py import LFA as LFA4py
from epygram import config, epygramError
from epygram.util import Angle, write_formatted, separation_line
from epygram.base import FieldValidity, FieldValidityList, FieldSet
from .LFA import LFA
from epygram.geometries import UnstructuredGeometry, VGeometry
from epygram.fields import V1DField, PointField, MiscField
from epygram.resources import FileResource
from epygram import profiles
__all__ = ['DDHLFA']
epylog = footprints.loggers.getLogger(__name__)
[docs]class DDHLFA(LFA):
"""
Class implementing all specificities for DDHLFA resource format.
"""
_footprint = dict(
attr=dict(
format=dict(
values=set(['DDHLFA']),
default='DDHLFA'),
validity=dict(
type=FieldValidityList,
access='rwx',
optional=True,
info="Describes the temporal validity of the resource."),
domains=dict(
type=FPDict,
optional=True,
info="Describes the domains covered by the resource."),
levels=dict(
type=FPDict,
optional=True,
info="Number of levels for variables/tendencies ('VT') and\
fluxes ('F')."),
xpid=dict(
optional=True,
info="Experiment identifier.")
)
)
def __init__(self, *args, **kwargs):
self.isopen = False
super(LFA, self).__init__(*args, **kwargs)
if not LFA4py.wlfatest(self.container.abspath):
raise IOError("This resource is not a LFA one.")
if self.openmode in ('r', 'a'):
guess = LFA(filename=self.container.abspath, openmode=self.openmode)
if not all([v in guess.listfields() for v in ['ECHEANCE', 'DATE', 'DOCFICHIER',
'VPP0', 'VPP1']]):
raise IOError("this resource is not a DDHLFA one.")
guess.close()
if self.openmode != 'r':
raise NotImplementedError("openmode != 'r' for DDHLFA.")
elif self.openmode == 'r':
if self.domains is not None or self.validity is not None:
epylog.warning(self.container.abspath +
": DDHLFA.__init__(): domains/validity" +
" argument will be ignored with" +
" this openmode ('r').")
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens the DDHLFA in Fortran sense, and initializes domains, validity
and vertical geometry.
:param openmode: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(DDHLFA, self).open(openmode=openmode)
if self.openmode == 'r':
# read info
self._attributes['xpid'] = super(DDHLFA, self).readfield('INDICE EXPERIENCE').getdata()[0]
self._read_geometry()
self._read_validity()
else:
raise NotImplementedError("openmode != 'r' for DDHLFA.")
################
# ABOUT FIELDS #
################
[docs] def find_fields_in_resource(self,
seed=None,
fieldtype=None,
generic=False,
**_):
"""
Returns a list of the fields from resource whose name match the given
*seed*.
:param seed: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
:param fieldtype: optional, among ('V1D', 'Point', 'Misc'). If provided,
filters out the fields not of the given type.
"""
fieldslist = super(DDHLFA, self).find_fields_in_resource(seed=seed)
if generic:
raise NotImplementedError("not yet.")
if fieldtype is not None:
fs = FieldSet()
for f in fieldslist:
fld = self.readfield(f, getdata=False)
if isinstance(fld, FieldSet):
fs.append(fld[0])
else:
fs.append(fld)
if fieldtype == 'V1D':
fieldslist = [f.fid[self.format] for f in fs
if isinstance(f, V1DField)]
elif fieldtype == 'Point':
fieldslist = [f.fid[self.format] for f in fs
if isinstance(f, PointField)]
elif fieldtype == 'Misc':
fieldslist = [f.fid[self.format] for f in fs
if isinstance(f, MiscField)]
else:
raise epygramError("unknown fieldtype for DDHLFA: " + fieldtype)
return fieldslist
@FileResource._openbeforedelayed
def readfield(self, fieldname,
getdata=True,
footprints_proxy_as_builder=config.footprints_proxy_as_builder):
"""
Reads a field in resource.
- Documentation fields ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER',
'ECHEANCE', 'DOCDnnn') are returned as :class:`epygram.formats.LFA`
returns.
- Profile/surface fields are returned as a
:class:`epygram.base.FieldSet` of 1D/Point fields, one for each
domain.
:param getdata: if False, do not read data but only metadata
:param footprints_proxy_as_builder: if *True*, uses footprints.proxy
to build fields and geometry.
"""
if footprints_proxy_as_builder:
field_builder = footprints.proxy.field
else:
if fieldname[0] in ('S', 'G'):
field_builder = PointField
else:
field_builder = V1DField
field_from_LFA = super(DDHLFA, self).readfield(fieldname,
getdata=getdata)
if fieldname in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER', 'ECHEANCE')\
or fieldname[0:4] == 'DOCD':
# documentation
field_from_LFA.setfid({self.format:field_from_LFA.fid['LFA']})
toreturn = field_from_LFA
else:
# true field
validity = FieldValidity(basis=self.validity.getbasis())
# distinction between initial, final and integrated fields
if fieldname[0:2] == 'SV':
validity.set(date_time=self.validity.get())
elif fieldname[0:2] == 'SF':
validity.set(date_time=self.validity.get(),
cumulativeduration=self.validity.cumulativeduration())
elif fieldname[0] in ('V', 'S'):
if fieldname[-1] == '0':
validity.set(date_time=self.validity.getbasis())
elif fieldname[-1] == '1':
validity.set(date_time=self.validity.get())
elif fieldname[0] in ('T', 'F', 'G') or fieldname == 'PPP':
validity.set(date_time=self.validity.get(),
cumulativeduration=self.validity.cumulativeduration())
toreturn = FieldSet()
if fieldname[0] in ('S', 'G'):
# surface fields
for d in range(self.domains['number']):
if getdata:
value = field_from_LFA.getdata()[d]
domain = self.domains['geometry'][d]
pgeometry = UnstructuredGeometry(name='DDH:' + domain['type'],
grid={'DDH_domain':domain},
vcoordinate=VGeometry(
typeoffirstfixedsurface=1,
levels=FPList([1])),
dimensions={'X':1, 'Y':1})
field = field_builder(structure='Point',
fid={self.format:fieldname},
geometry=pgeometry,
validity=validity,
processtype=self.processtype)
if getdata:
field.setdata(value)
toreturn.append(field)
else:
# profile fields
if not getdata:
fieldlevels = (LFA4py.wlfacas(self._unit, fieldname)[1] //
self.domains['number'])
else:
fieldlevels = (len(field_from_LFA.getdata()) //
self.domains['number'])
if fieldlevels == self.levels['VT']:
position_on_grid = 'mass'
# gridposition = 'mass'
# gridsize = self.levels['VT']
elif fieldlevels == self.levels['F']:
position_on_grid = 'flux'
# gridposition = 'flux'
# gridsize = self.levels['F']
for d in range(self.domains['number']):
domain = self.domains['geometry'][d]
if fieldlevels == self.levels['VT']:
if validity.term(fmt='IntHours') == 0:
pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_init']
else:
pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_term']
elif fieldlevels == self.levels['F']:
if validity.term(fmt='IntHours') == 0:
pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_init']
else:
pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_term']
vcoordinate = VGeometry(typeoffirstfixedsurface=100,
levels=FPList(pressure_vertical_grid / 100.),
# grid={'gridposition':gridposition, # TODO: ?
# 'gridlevels':pressure_vertical_grid}, # TODO: ?
position_on_grid=position_on_grid)
vgeometry = UnstructuredGeometry(name='DDH:' + domain['type'],
grid={'DDH_domain':domain},
vcoordinate=vcoordinate,
dimensions={'X':1, 'Y':1})
if getdata:
profile = field_from_LFA.getdata()[d * fieldlevels:(d + 1) * fieldlevels]
field = field_builder(structure='V1D',
fid={self.format:fieldname, 'generic':FPDict()},
geometry=vgeometry,
validity=validity,
processtype=self.processtype)
if getdata:
field.setdata(profile)
toreturn.append(field)
return toreturn
[docs] def readfields(self, requestedfields, **kwargs):
"""
Inactivation of readfields because readfield already returns a FieldSet.
"""
raise epygramError("readfields() method disabled for DDHLFA.")
#################
# DDH interface #
#################
@FileResource._openbeforedelayed
def what(self, out=sys.stdout, sortfields=False, **_):
"""
Writes in file a summary of the contents of the DDHLFA.
:param out: the output open file-like object (duck-typing: *out*.write()
only is needed).
:param sortfields: **True** if the fields have to be sorted by type.
"""
domains = self.domains
mass_vertical_levels = len(self.readfield('VPP0')[0].geometry.vcoordinate.levels)
listoffields = self.listfields()
validity = self.validity
# Write out
out.write("### FORMAT: " + self.format + "\n")
out.write("\n")
out.write("### IDENTIFIER (INDICE EXPERIENCE): " + self.xpid + "\n")
out.write("\n")
out.write("################\n")
out.write("### VALIDITY ###\n")
out.write("################\n")
write_formatted(out, "Validity", validity.get())
write_formatted(out, "Basis", validity.getbasis())
write_formatted(out, "Term", validity.term())
write_formatted(out, "Duration for cumulative quantities",
validity.cumulativeduration())
write_formatted(out, "Kind of producting process", self.processtype)
out.write(separation_line)
out.write("\n")
out.write("###############\n")
out.write("### DOMAINS ###\n")
out.write("###############\n")
out.write("Domain(s): " + str(domains['number']) + " " +
domains['geometry'][0]['type'] + "\n")
for d in range(domains['number']):
geom = domains['geometry'][d]
if geom['type'] != 'globe':
out.write("# " + str(d + 1) + ":\n")
if geom['type'] == 'ij_point':
write_formatted(out, "Longitude", geom['lon'].get('degrees'))
write_formatted(out, "Latitude", geom['lat'].get('degrees'))
write_formatted(out, "Point index (X)", geom['jlon'])
write_formatted(out, "Point index (Y)", geom['jgl'])
elif geom['type'] == 'point':
write_formatted(out, "Longitude", geom['lon'].get('degrees'))
write_formatted(out, "Latitude", geom['lat'].get('degrees'))
write_formatted(out, "Point index (X)", geom['jlon'])
write_formatted(out, "Point index (Y)", geom['jgl'])
write_formatted(out, "User Longitude",
geom['user_lon'].get('degrees'))
write_formatted(out, "User Latitude",
geom['user_lat'].get('degrees'))
elif geom['type'] in ('quadrilateral', 'rectangle'):
write_formatted(out, "Longitude 1", geom['lon1'].get('degrees'))
write_formatted(out, "Latitude 1", geom['lat1'].get('degrees'))
write_formatted(out, "Longitude 2", geom['lon2'].get('degrees'))
write_formatted(out, "Latitude 2", geom['lat2'].get('degrees'))
write_formatted(out, "Longitude 3", geom['lon3'].get('degrees'))
write_formatted(out, "Latitude 3", geom['lat3'].get('degrees'))
write_formatted(out, "Longitude 4", geom['lon4'].get('degrees'))
write_formatted(out, "Latitude 4", geom['lat4'].get('degrees'))
elif geom['type'] == 'zonal_bands':
write_formatted(out, "Center Latitude",
geom['lat'].get('degrees'))
out.write(separation_line)
out.write("\n")
out.write("#########################\n")
out.write("### VERTICAL GEOMETRY ###\n")
out.write("#########################\n")
write_formatted(out, "Number of vertical levels for variables",
mass_vertical_levels)
write_formatted(out, "Number of vertical levels for fluxes",
mass_vertical_levels + 1)
out.write(separation_line)
out.write("\n")
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
numfields = len(listoffields)
out.write("Number: " + str(numfields) + "\n")
out.write(separation_line)
if sortfields:
out.write("Documentation:\n")
out.write("-------------\n")
docfields = [f for f in listoffields if
f in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER',
'ECHEANCE') or f[0:4] == 'DOCD']
for f in docfields:
out.write(f + "\n")
out.write("-----------------\n")
out.write("Vertical profiles:\n")
out.write("-----------------\n")
profilefields = [f for f in listoffields
if f[0] not in ('S', 'G') and f not in docfields]
for f in profilefields:
out.write(f + "\n")
out.write("--------------\n")
out.write("Surface fields:\n")
out.write("--------------\n")
surfacefields = [f for f in listoffields
if f not in docfields and f not in profilefields]
for f in surfacefields:
out.write(f + "\n")
else:
for f in listoffields:
out.write(f + "\n")
out.write(separation_line)
@FileResource._openbeforedelayed
def _read_geometry(self):
"""
Reads DOCFICHIER and DOCDnnn fields, to fill-in domains and levels
attributes. Cf. J.M. Piriou's documentation about DDH.
"""
domains = {}
DOCFICHIER = self.readfield('DOCFICHIER')
vpp_init = super(DDHLFA, self).readfield('VPP0')
vpp_term = super(DDHLFA, self).readfield('VPP1')
NFLEV = DOCFICHIER.getdata()[5]
self._attributes['levels'] = {'VT':NFLEV, 'F':NFLEV + 1}
if DOCFICHIER.getdata()[0] == 1:
domains['type'] = 'limited_area_domains'
elif DOCFICHIER.getdata()[0] == 5:
domains['type'] = 'global_domain'
elif DOCFICHIER.getdata()[0] == 6:
domains['type'] = 'zonal_bands'
else:
raise NotImplementedError("DOCFICHIER[0] not among (1,5,6).")
domains['number'] = DOCFICHIER.getdata()[14]
domains['geometry'] = []
domains['vertical_grid'] = []
if self.xpid == 'ARPE':
vertical_mean = 'arithmetic'
else:
vertical_mean = 'geometric'
for d in range(1, domains['number'] + 1):
DOCD = self.readfield('DOCD' + '{:0>{width}}'.format(str(d),
width=3))
DOCD_data = DOCD.getdata()
geom = {}
if DOCD_data[10] == 1.:
geom['type'] = 'ij_point'
geom['lon'] = Angle(DOCD_data[2], 'radians')
geom['lat'] = Angle(math.asin(DOCD_data[3]), 'radians')
geom['jlon'] = int(DOCD_data[4])
geom['jlgl'] = int(DOCD_data[5])
elif DOCD_data[10] == 4.:
geom['type'] = 'point'
geom['lon'] = Angle(DOCD_data[2], 'radians')
geom['lat'] = Angle(math.asin(DOCD_data[3]), 'radians')
geom['jlon'] = int(DOCD_data[4])
geom['jgl'] = int(DOCD_data[5])
geom['user_lon'] = Angle(DOCD_data[6], 'radians')
geom['user_lat'] = Angle(math.asin(DOCD_data[7]), 'radians')
elif DOCD_data[10] == 2.:
geom['type'] = 'quadrilateral'
geom['lon1'] = Angle(DOCD_data[2], 'radians')
geom['lat1'] = Angle(math.asin(DOCD_data[3]), 'radians')
geom['lon2'] = Angle(DOCD_data[4], 'radians')
geom['lat2'] = Angle(math.asin(DOCD_data[5]), 'radians')
geom['lon3'] = Angle(DOCD_data[6], 'radians')
geom['lat3'] = Angle(math.asin(DOCD_data[7]), 'radians')
geom['lon4'] = Angle(DOCD_data[8], 'radians')
geom['lat4'] = Angle(math.asin(DOCD_data[9]), 'radians')
elif DOCD_data[10] == 3.:
geom['type'] = 'rectangle'
geom['lon1'] = Angle(DOCD_data[2], 'radians')
geom['lat1'] = Angle(math.asin(DOCD_data[3]), 'radians')
geom['lon2'] = Angle(DOCD_data[4], 'radians')
geom['lat2'] = Angle(math.asin(DOCD_data[5]), 'radians')
geom['lon3'] = Angle(DOCD_data[6], 'radians')
geom['lat3'] = Angle(math.asin(DOCD_data[7]), 'radians')
geom['lon4'] = Angle(DOCD_data[8], 'radians')
geom['lat4'] = Angle(math.asin(DOCD_data[9]), 'radians')
elif DOCD_data[10] == 5.:
geom['type'] = 'globe'
elif DOCD_data[10] == 6.:
geom['type'] = 'zonal_bands'
geom['lat'] = Angle(math.asin(DOCD_data[3]), 'radians')
geom['band'] = DOCD_data[1]
geom['bands_number'] = DOCD_data[2]
else:
raise ValueError('unknown value: ' + str(DOCD_data[10]) +
' for DOCD' + '{:0>{width}}'.format(str(d),
width=3))
# vertical grid
dm1 = d - 1
vgrid = {}
fluxlevels_pressure_init = vpp_init.getdata()[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT']
] * g0
fluxlevels_pressure_init = [sum(fluxlevels_pressure_init[0:i])
for i in range(1, len(fluxlevels_pressure_init) + 1)]
vgrid['fluxlevels_pressure_init'] = numpy.array([0.] + fluxlevels_pressure_init)
vgrid['masslevels_pressure_init'] = profiles.flux2masspressures(fluxlevels_pressure_init,
vertical_mean)
fluxlevels_pressure_term = vpp_term.getdata()[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT']
] * g0
fluxlevels_pressure_term = [sum(fluxlevels_pressure_term[0:i])
for i in range(1, len(fluxlevels_pressure_term) + 1)]
vgrid['fluxlevels_pressure_term'] = numpy.array([0.] + fluxlevels_pressure_term)
vgrid['masslevels_pressure_term'] = profiles.flux2masspressures(fluxlevels_pressure_term,
vertical_mean)
domains['geometry'].append(geom)
domains['vertical_grid'].append(vgrid)
self._attributes['domains'] = domains
@FileResource._openbeforedelayed
def _read_validity(self):
"""
Reads DATE and ECHEANCE fields, to fill-in a FieldValidity object.
Cf. J.M. Piriou's documentation about DDH.
"""
DATE = self.readfield('DATE')
DATE_data = DATE.getdata()
year = DATE_data[0]
month = DATE_data[1]
day = DATE_data[2]
hour = DATE_data[3]
minute = DATE_data[4]
processtype = DATE_data[8]
if processtype == 0:
self.processtype = 'analysis'
elif processtype == 1:
self.processtype = 'initialization'
elif processtype == 10:
self.processtype = 'forecast'
# cumulated_timesteps = DATE_data[9]
basis = datetime.datetime(year, month, day, hour, minute)
ECHEANCE = self.readfield('ECHEANCE')
term = datetime.timedelta(seconds=int(ECHEANCE.getdata()[0]))
cumulativeduration = term
self.validity = FieldValidity(basis=basis,
term=term,
cumulativeduration=cumulativeduration)