#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class that handle a CombineLevelsResource.
This resource exposes 3D fields when the low level resource only expose horizontal fields.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import six
import copy
from footprints import proxy as fpx
from footprints import FPDict
from epygram.base import Resource, FieldSet
from epygram.util import fmtfid
from epygram import epygramError
[docs]class CombineLevelsResource(Resource):
"""Class implementing a CombineLevelsResource."""
_collector = ('resource_modificator', 'epyresource')
_footprint = dict(
attr=dict(
resource=dict(
type=Resource,
info="Low level resource"),
name=dict(
values=set(['CombineLevels'])),
virtual=dict(
info="Must readfield return a virtual field",
type=bool,
optional=True,
default=False)
)
)
def __init__(self, *args, **kwargs):
super(Resource, self).__init__(*args, **kwargs)
if self.resource.openmode != self.openmode:
raise epygramError("The low level resource must be opened using the same mode as this high-level resource.")
self.format = "CombineLevels"
[docs] def open(self):
"""Opens the low level resource"""
if not self.resource.isopen:
self.resource.open()
@property
def isopen(self):
return self.resource.isopen
[docs] def close(self):
"""Closes the low level resource."""
try:
self.resource.close()
except Exception:
pass
[docs] def find_fields_in_resource(self, seed=None, generic=False):
"""
Returns a list of the fields from resource whose name match the given
seed.
:param seed: might be a 'handgrip', i.e. a dict where you can store all
requested keys,
e.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl', 'level':850},
a list of handgrips or *None*. If *None* (default), returns the list
of all fields in resource.
:param generic: if True, returns a list of tuples (fid, fid) of
the fields (for mimetism with other formats).
"""
if seed is None or isinstance(seed, dict):
fieldslist = self.listfields(select=seed)
elif isinstance(seed, list):
fieldslist = []
for s in seed:
fieldslist.extend(self.listfields(select=s))
else:
raise epygramError("unknown type for seed: " + str(type(seed)))
if fieldslist == []:
raise epygramError("no field matching '" + str(seed) +
"' was found in resource ")
if generic:
fieldslist = [(fieldslist[i], fieldslist[i]) for i in range(len(fieldslist))]
return fieldslist
def _create_list(self):
"""Creates the list of available fields associated with the original fids."""
result = {}
# Loop over the low level resource fids
for fid in self.resource.listfields(complete=True):
assert 'generic' in fid, \
"Not able to combine levels if fields do not have 'generic' fids"
original_fid = fid[fmtfid(self.resource.format, fid)]
generic_fid = copy.deepcopy(fid['generic'])
level = generic_fid.pop('level', None) # we suppress level from generic_fid
scaledValueOfFirstFixedSurface = generic_fid.pop('scaledValueOfFirstFixedSurface', None)
hashable_generic_fid = tuple([(k, generic_fid[k]) for k in sorted(generic_fid.keys())])
if hashable_generic_fid not in result:
result[hashable_generic_fid] = {'original_fids':[], 'generic':None}
result[hashable_generic_fid]['original_fids'].append((original_fid, level, scaledValueOfFirstFixedSurface))
result[hashable_generic_fid]['generic'] = generic_fid
# For fields present on only one layer, we put again the level in the generic fid
for k, v in result.items():
if len(v['original_fids']) == 1:
if v['original_fids'][0][1] is not None:
v['generic']['level'] = v['original_fids'][0][1]
if v['original_fids'][0][2] is not None:
v['generic']['scaledValueOfFirstFixedSurface'] = v['original_fids'][0][2]
return result
[docs] def listfields(self, onlykey=None, select=None, complete=False):
"""Lists the available fields."""
fidlist = [v['generic'] for v in self._create_list().values()]
if select is not None:
fidlist = [f for f in fidlist if all([(k in f and f[k] == select[k]) for k in select.keys()])]
if onlykey is not None:
if isinstance(onlykey, six.string_types):
fidlist = [f[onlykey] for f in fidlist]
elif isinstance(onlykey, tuple):
fidlist = [{k:f[k] for k in onlykey} for f in fidlist]
if complete:
fidlist = [{'generic': fid, self.format:fid} for fid in fidlist]
return fidlist
[docs] def sortfields(self, sortingkey, onlykey=None):
"""
Returns a sorted list of fields with regards to the given *sortingkey*
of their fid, as a dict of lists.
:param sortingkey: sorting key
:param onlykey: can be specified as a string or a tuple of strings,
so that only specified keys of the fid will returned.
"""
# Taken from GRIB.py
sortedfields = {}
listoffields = self.listfields()
onlykeylistoffields = self.listfields(onlykey=onlykey)
for f in range(len(listoffields)):
try:
category = listoffields[f][sortingkey]
except KeyError:
category = 'None'
field = onlykeylistoffields[f]
if category in sortedfields:
sortedfields[category].append(field)
else:
sortedfields[category] = [field]
return sortedfields
[docs] def readfield(self, handgrip, getdata=True):
"""
Read the field in the low level resource and join the levels.
:param handgrip: identification of the field
:param getdata: if False, do not read data but only metadata
"""
result = self.readfields(handgrip=handgrip, getdata=getdata)
if len(result) != 1:
raise epygramError(str(len(result)) + "field(s) have been found, only one expected.")
return result[0]
[docs] def readfields(self, handgrip, getdata=True):
"""
Read the field in the low level resource and join the levels.
:param handgrip: identification of the field
:param getdata: if False, do not read data but only metadata
"""
fieldset = FieldSet()
cont = self._create_list()
for fid in self.listfields(select=handgrip):
fid = FPDict(fid) # footprints purpose
found = None
for k, v in cont.items():
if fid == v['generic']:
if found is not None:
raise epygramError("Internal error...")
else:
found = k
if found is None:
raise epygramError("Internal error....")
if 'parameterNumber' not in fid or fid['parameterNumber'] == 255:
# We do not join levels when parameterNumber is not known
print("readfields", handgrip)
for original_fid in cont[found]['original_fids']:
field = self.resource.readfield(original_fid[0], getdata=getdata)
field.fid[self.format] = fid
fieldset.append(field)
else:
fidList = [original_fid[0] for original_fid in cont[found]['original_fids']]
if len(fidList) == 1:
field = self.resource.readfield(fidList[0], getdata=getdata)
field.fid[self.format] = fid
fieldset.append(field)
else:
structure = self.resource.readfield(fidList[0], getdata=False).structure
virtualField = fpx.field(fid={'generic':fid, self.format:fid},
structure={'H2D':'3D',
'Point':'V1D',
'H1D':'V2D',
'V1D':'V1D',
'V2D':'V2D',
'3D':'3D'}[structure],
resource=self.resource,
resource_fids=fidList)
if self.virtual:
fieldset.append(virtualField)
else:
fieldset.append(virtualField.as_real_field(getdata=getdata))
return fieldset
[docs] def writefield(self, *args, **kwargs):
"""Write fields."""
raise NotImplementedError("writefield is not implemented")
# To implement writefield we need to check that each validity is affected to only one resource
@property
def spectral_geometry(self):
"""Returns the spectral_geometry"""
return self.resource.spectral_geometry