#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
FA fields utilities
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import os
import re
import six
import io
from epygram.extra import griberies
from epygram import config, epygramError
[docs]class FaGribDef(griberies.GribDef):
"""
Handle FA-related GRIB definition files.
To add user files:
a) use env vars GRIBAPI|ECCODES_DEFINITION_PATH or
b) move it under {config.userlocaldir}/{FaGribDef.dirname}
"""
_re_dirname = re.compile(r'gribapi\.def\.[\d_]+')
_official_rootdir = os.path.join(config.installdir, 'data')
_non_GRIB_keys = ('LSTCUM', 'FMULTM', 'FMULTE', 'ZLMULT')
def __init__(self, actual_init=True,
concepts=['faFieldName', 'faModelName', 'faLevelName']):
super(FaGribDef, self).__init__(actual_init, concepts)
def _find_gribdefs(self, root):
dirs = []
if os.path.exists(root) and os.path.isdir(root):
for d in sorted(os.listdir(root)):
if self._re_dirname.match(d):
d = os.path.join(root, d)
if os.path.isdir(d):
dirs.append(d)
return dirs
def _actual_init(self):
"""Read definition files."""
# first, those specified by env var
defpaths = griberies.get_definition_paths()
# then those embarked with epygram
defpaths = self._find_gribdefs(self._official_rootdir) + defpaths
# complete with local user definitions, if existing
defpaths = self._find_gribdefs(config.userlocaldir) + defpaths
# read gribdef files
for d in defpaths[::-1]:
for grib_edition in ('grib1', 'grib2'):
for concept in self._concepts:
self._read_localConcept(concept, d, grib_edition)
self._initialized = True
def _read_localConcept(self, concept, directory,
grib_edition=griberies.GribDef._default_grib_edition):
pathname = os.path.join(directory, grib_edition, 'localConcepts', 'lfpw', concept + '.def')
if os.path.exists(pathname):
self.read(pathname, grib_edition)
@griberies.init_before
def FA2GRIB(self, fieldname,
grib_edition=griberies.GribDef._default_grib_edition,
include_comments=False,
fatal=False,
filter_non_GRIB_keys=True):
"""
Convert FA field name to GRIB fid, including vertical level identification.
:param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid
:param include_comments: if a comment is present if grib def, bring it in fid
:param fatal: if True and fieldname is not retrieved, raise a ValueError;
else, return a default 255 fid
:param filter_non_GRIB_keys: filter out the non-GRIB keys that may be
present in grib def of field
"""
_re_altitude = re.compile(r'(?P<ltype>[A-W]+)(?P<level>\d+)(?P<param>.+)')
if fieldname in self.tables[grib_edition]['faFieldName']:
fid = self._get_def(fieldname, 'faFieldName',
grib_edition, include_comments)
elif fieldname.replace(' ', '_') in self.tables[grib_edition]['faFieldName']: # FIXME: ?
fid = self._get_def(fieldname.replace(' ', '_'), 'faFieldName',
grib_edition, include_comments)
else:
rematch = _re_altitude.match(fieldname)
if rematch:
fid = self._get_def(rematch.group('ltype'), 'faLevelName',
grib_edition, include_comments,
fatal=False)
fid.update(self._get_def(rematch.group('param'), 'faFieldName',
grib_edition, include_comments,
fatal=False))
level = int(rematch.group('level'))
if level == 0: # formatting issue
level = 100000
fid['scaleFactorOfFirstFixedSurface'] = 0
fid['scaledValueOfFirstFixedSurface'] = level
if rematch.group('ltype') == 'P':
level /= 100
fid['level'] = level
else:
if fatal:
raise ValueError("field not found: {}".format(fieldname))
else:
fid = self._get_def('default', 'faFieldName',
grib_edition, include_comments)
fid.setdefault('typeOfFirstFixedSurface', 255)
if filter_non_GRIB_keys:
fid = self._filter_non_GRIB_keys(fid)
# productDefinitionTemplateNumber to distinguish between
# CLSTEMPERATURE and CLSMINI.TEMPERAT / CLSMAXI.TEMPERAT (for instance)
if 'productDefinitionTemplateNumber' not in fid:
fid['productDefinitionTemplateNumber'] = 0
return fid
@griberies.init_before
def GRIB2FA(self, gribfid,
grib_edition=griberies.GribDef._default_grib_edition):
"""
Look for a unique matching field in tables.
! WARNING ! the unicity might not be ensured depending on the version
of grib definitions files.
:param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid
"""
matching_fields = self.lookup_GRIB(gribfid, grib_edition)
if len(matching_fields) == 1:
return matching_fields.keys[0]
elif len(matching_fields) == 0:
raise ValueError("No field matching *fid* was found in tables.")
elif len(matching_fields) > 1:
raise ValueError("Several fields matching were found. Use method *lookup_GRIB* to refine.")
@griberies.init_before
def lookup_FA(self, partial_fieldname,
grib_edition=griberies.GribDef._default_grib_edition,
include_comments=False,
filter_non_GRIB_keys=True):
"""
Look for all the fields which FA name contain **partial_fieldname**.
:param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid
:param include_comments: if a comment is present if grib def, bring it in fid
:param filter_non_GRIB_keys: filter out the non-GRIB keys that may be
present in grib def of field
"""
fields = {}
try: # first try to get it as an altitude one
gribfid = self.FA2GRIB(partial_fieldname,
grib_edition=grib_edition,
include_comments=include_comments,
fatal=True,
filter_non_GRIB_keys=filter_non_GRIB_keys)
except ValueError:
pass # field was not found as such; might be partial => finally
else:
fields[partial_fieldname] = gribfid
finally:
for f, gribfid in self.tables[grib_edition]['faFieldName'].items():
if partial_fieldname in f:
fields[f] = gribfid
for k, fid in fields.items():
if not include_comments:
fid.pop('#comment', None)
if filter_non_GRIB_keys:
fields[k] = self._filter_non_GRIB_keys(fid)
return fields
@griberies.init_before
def lookup_GRIB(self, partial_fid,
grib_edition=griberies.GribDef._default_grib_edition,
include_comments=False,
filter_non_GRIB_keys=True):
"""
Look for all the fields which GRIB fid contain **partial_fid**.
:param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid
:param include_comments: if a comment is present if grib def, bring it in fid
:param filter_non_GRIB_keys: filter out the non-GRIB keys that may be
present in grib def of field
"""
return self._lookup_from_kv(partial_fid, 'faFieldName',
grib_edition=grib_edition,
include_comments=include_comments,
filter_non_GRIB_keys=filter_non_GRIB_keys)
def __contains__(self, fid):
try:
if isinstance(fid, six.string_types):
fid = griberies.parse_GRIBstr_todict(fid)
except SyntaxError: # fid is a FA fieldname
try:
self.FA2GRIB(fid, fatal=True)
except ValueError:
ok = False
else:
ok = True
else: # fid is a GRIB fid
ok = len(self.lookup_GRIB(fid)) > 0
return ok
[docs]class SfxFldDesc_Mod(object):
"""Handle fields catalog from sfxflddesc.F90 source file."""
_pattern = re.compile(r'"\.(?P<name>[\w\d%]+)\.+(?P<gridtype>\d)\.{2}(?P<type>\w\d)\.{2}(?P<comment>.+)\.{2}(?P<mask>.{20})\.{2}",? ?(&|/)')
_unit = re.compile(r'(?P<comment>.*)\((?P<unit>.+)\)')
_fortran_sourcename = 'sfxflddesc_mod.F90'
type2nature = {'X':'float', 'L':'bool', 'C':'str', 'N':'int', 'Y':'float',
'T':'int'} # FIXME: T: cf mode_write_surf_fa.F90
def __init__(self, actual_init=True):
self.table = {}
if actual_init:
self._actual_init()
else:
self._initialized = False
def _actual_init(self):
"""Read official file and, if existing, user local file."""
# official one
filename = os.path.join(config.installdir, 'data',
self._fortran_sourcename)
self.read(filename)
# local user one
user_sfxflddesc = os.path.join(config.userlocaldir,
self._fortran_sourcename)
if os.path.exists(user_sfxflddesc):
self.read(user_sfxflddesc)
self._initialized = True
[docs] def read(self, filename):
"""Parse a sfxflddesc_mod.F90 file."""
with io.open(filename, 'r') as f:
lines = f.readlines()
for line in lines:
m = self._pattern.match(line)
if m:
info = {}
name = m.group('name').replace('.', '')
info['gridtype'] = int(m.group('gridtype'))
info['type'] = m.group('type')
info['comment'] = m.group('comment').replace('.', ' ').strip()
u = self._unit.match(info['comment'])
if u:
info['unit'] = u.group('unit').replace('.', ' ')
info['comment'] = u.group('comment').strip()
mask = m.group('mask').replace('.', '')
if mask:
info['mask'] = mask
self.table[name] = info
self.table['S1D_' + name] = info
self.table['SFX.' + name] = info
[docs] def update(self, field_dict):
"""Update with a dict of field {fieldname:{}, ...}."""
self.table.update(field_dict)
@griberies.init_before
def nature(self, fieldname, default=None):
"""Return type of data in field."""
try:
return self.type2nature[self.table[fieldname]['type'][0]]
except KeyError:
if default is None:
raise
else:
return default
@griberies.init_before
def dim(self, fieldname, default=None):
"""Return number of dimensions of field."""
try:
return int(self.table[fieldname]['type'][1])
except KeyError:
if default is None:
raise
else:
return default
@griberies.init_before
def __getitem__(self, fieldname):
return self.table[fieldname]
@griberies.init_before
def get(self, fieldname, default=None):
return self.table.get(fieldname, default)
@griberies.init_before
def is_metadata(self, fieldname):
"""True if field is not a H2D field but metadata (Misc)."""
if fieldname in self.table:
if self.table[fieldname]['type'] in ('X1', 'X2'):
return False
else:
return True
elif '_FBUF_' in fieldname:
return True
else:
return None
@griberies.init_before
def __contains__(self, fieldname):
return fieldname in self.table
[docs]def find_wind_pair(fieldname):
"""For a wind **fieldname**, find and return the pair."""
pairs = {'U':'V', 'V':'U',
'ZONAL':'MERIDIEN', 'MERIDIEN':'ZONAL', 'MERID':'ZONAL'}
patterns = [r'S\d+WIND\.(?P<d>[U,V])\.PHYS',
r'[P,H,V]\d+VENT_(?P<d>(ZONAL)|(MERID)|(MERIDIEN))',
r'CLSVENTNEUTRE.(?P<d>[U,V])',
r'CLSVENT.(?P<d>(ZONAL)|(MERIDIEN))',
r'CLS(?P<d>[U,V]).RAF.MOD.XFU']
axes = {'U':'x', 'ZONAL':'x',
'V':'y', 'MERIDIEN':'y', 'MERID':'y'}
pair = None
for pattern in patterns:
re_ok = re.match(pattern, fieldname)
if re_ok:
pair = (axes[pairs[re_ok.group(1)]],
fieldname.replace(re_ok.group('d'),
pairs[re_ok.group('d')]))
break
if pair is None:
raise epygramError('not a wind field')
else:
return pair