Source code for epygram.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains some base classes of *epygram*.
"""
from __future__ import print_function, absolute_import, unicode_literals, division

import numpy
import datetime
import copy
import sys
import hashlib
import six

import footprints
from footprints import FootprintBase, FPDict

from epygram import epygramError, config
from epygram.util import RecursiveObject, separation_line, write_formatted

epylog = footprints.loggers.getLogger(__name__)


[docs]class Field(RecursiveObject, FootprintBase): """ Generic abstract class implementing a Field, composed of an identifier and a data. The field identifier *fid* identifies a field with a set of keys. Each key (named after the format name) identifies the field for a given format. A specific key 'generic' is a GRIB2-like description. E.g. *{'FA':'SURFTEMPERATURE', 'generic':{'typeOfFirstFixedSurface':1, 'discipline':2, 'parameterCategory':3, 'parameterNumber':18}}*. """ _collector = ('field',) _abstract = True _footprint = dict( attr=dict( fid=dict( type=FPDict, access='rwx'), comment=dict( optional=True, access='rwd'), misc_metadata=dict( type=FPDict, optional=True, default=FPDict(), access='rwd'), units=dict( optional=True, access='rwd', default='') ) ) def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" super(Field, self).__init__(*args, **kwargs) self._data = None
[docs] def getdata(self): """ Returns the field data. Generic, default method for inheriting classes that do not overwrite it. """ return self._data
[docs] def setdata(self, data): """Sets or overwrites the field data as a numpy array.""" if not isinstance(data, numpy.ndarray): data = numpy.array(data) self._data = data
[docs] def deldata(self): """Empties the data.""" self._data = None
data = property(getdata, setdata, deldata, "Accessor to the field data.")
[docs] def setfid(self, fid): """ Sets or overwrites the field fid given as a dict. """ if not isinstance(fid, dict): raise epygramError("**fid** must be a dict.") self._attributes['fid'] = fid
[docs] def clone(self, fid=None): """ Returns a cloned field, optionally with a new **fid** given as a dict. """ clone = self.deepcopy() if fid is not None: clone.setfid(fid) return clone
################### # pre-applicative # ###################
[docs] def stats(self, **kwargs): """ Computes some basic statistics on the field, as a dict containing: {'min', 'max', 'mean', 'std', 'quadmean', 'nonzero'}. See each of these methods for details. Optional arguments can be passed, depending on the inheriting class, passed to getdata(). """ return {'min':self.min(**kwargs), 'max':self.max(**kwargs), 'mean':self.mean(**kwargs), 'std':self.std(**kwargs), 'quadmean':self.quadmean(**kwargs), 'nonzero':self.nonzero(**kwargs)}
[docs] def min(self, **kwargs): """Returns the minimum value of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(data.min())
[docs] def max(self, **kwargs): """Returns the maximum value of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(data.max())
[docs] def mean(self, **kwargs): """Returns the mean value of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(data.mean())
[docs] def std(self, **kwargs): """Returns the standard deviation of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(data.std())
[docs] def quadmean(self, **kwargs): """Returns the quadratic mean of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(numpy.sqrt((data ** 2).mean()))
[docs] def absmean(self, **kwargs): """Returns the mean of absolute value of data.""" data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return float(numpy.abs(data).mean())
[docs] def nonzero(self, **kwargs): """ Returns the number of non-zero values (whose absolute value > config.epsilon). """ data = numpy.ma.masked_outside(self.getdata(**kwargs), - config.mask_outside, config.mask_outside) return int(numpy.count_nonzero(abs(data) > config.epsilon))
[docs] def sha256_checksum(self, **kwargs): """ Return a SHA256 checksum of the field data. """ s256 = hashlib.sha256() s256.update(self.getdata(**kwargs).tobytes()) return s256.hexdigest()
############# # OPERATORS # #############
[docs] def operation(self, operation, operand=None): """ Makes the requested operation on the field. :param operation: any of '+', '-', '*', '/', or 'normalize', 'ceil', 'exp', 'log'... and you can try with every other **numpy** function. :param operand: operand for the 4 basic operations, may be a scalar or another field with according geometry. """ if operand is not None: if isinstance(operand, self.__class__): self.operation_with_other(operation, operand) else: self.scalar_operation(operation, operand) else: if operation == 'normalize': self.setdata((self._data - self._data.min()) / (self._data.max() - self._data.min()) ) else: try: self.setdata(getattr(numpy, operation)(self._data)) except Exception: raise
[docs] def operation_with_other(self, operation, other): """ Makes an in-place operation with another field. :param operation: among ('+', '-', '*', '/') :param other: another field, with according dimensions """ self._check_operands(other) if operation == '+': self.setdata(self._data + other._data) elif operation == '*': self.setdata(self._data * other._data) elif operation == '-': self.setdata(self._data - other._data) elif operation == '/': self.setdata(self._data / other._data)
[docs] def scalar_operation(self, operation, scalar): """ Makes an in-place scalar operation on field. :param operation: among ('+', '-', '*', '/') :param scalar: a float or int. """ self._check_operands(scalar) if operation == '+': self.setdata(self._data + scalar) elif operation == '*': self.setdata(self._data * scalar) elif operation == '-': self.setdata(self._data - scalar) elif operation == '/': self.setdata(self._data / scalar)
[docs] def compare_to(self, other): """ Compare a field to another one, with several criteria: - bias: average of errors distribution - std: standard deviation of errors distribution - errmax: maximum absolute error - common_mask: indicate whether fields have the same mask or not; beware that above statistics apply only to commonly unmasked data :return ({bias, std, errmax}, common_mask) """ selfdata = self._masked_data() otherdata = self._masked_data() common_mask = not numpy.any(numpy.logical_xor(selfdata.mask, otherdata.mask)) diff = self - other return ({'bias':diff.mean(), 'std':diff.std(), 'errmax':max(abs(diff.min()), abs(diff.max()))}, common_mask)
[docs] def normalized_comparison(self, ref): """ Compare field to a reference, with prior normalization (by reference magnitude) of both fields. Hence the figures of comparison can be interpretated as percentages. """ refmin = ref.min() refmax = ref.max() selfmin = self.min() selfmax = self.max() if abs(refmax - refmin) <= config.epsilon: # ref is constant if refmin <= config.epsilon: normalizedref = ref if abs(selfmax - selfmin) <= config.epsilon: # test is also constant if selfmin <= config.epsilon: # both 0. : no normalization required normalizedself = self else: # test is not 0. vs. ref is 0. normalizedself = self / selfmin # so that normalized error = 1. else: # test is not constant but ref is constant 0. : normalize by itself normalizedself = (self - selfmin).__div__(selfmax - selfmin) # FIXME: classical operators seem to fail ? else: # ref is constant not 0. normalizedref = ref.__div__(refmin) normalizedself = self.__div__(refmin) else: # ref is not constant normalizedself = (self - refmin).__div__(refmax - refmin) # FIXME: classical operators seem to fail ? normalizedref = (ref - refmin).__div__(refmax - refmin) # FIXME: classical operators seem to fail ? return normalizedself.compare_to(normalizedref)
def _masked_data(self, mask_outside=config.mask_outside, **kwargs): """ Return self field data as a masked array. :param mask_outside: if None, mask is empty; else, mask data outside +/- this value """ if isinstance(self._data, numpy.ma.masked_array): mdata = self.getdata(**kwargs) else: if mask_outside is not None: mdata = numpy.ma.masked_outside(self.getdata(**kwargs), -mask_outside, mask_outside) else: mdata = numpy.ma.masked_array(self.getdata(**kwargs)) return mdata def _masked_any(self, other, mask_outside=config.mask_outside, **kwargs): """ Get a copy of self data and **other** data, with masked data where any of them is masked. :param mask_outside: if None, mask is empty; else, mask data outside +/- this value """ data = self._masked_data(mask_outside, **kwargs) otherdata = other._masked_data(mask_outside, **kwargs) cmask = numpy.logical_or(data.mask, otherdata.mask) data.mask = cmask otherdata.mask = cmask return data, otherdata
[docs] def correlation(self, other, commonmask=False, mask_outside=config.mask_outside): """ Compute a correlation coefficient R to another field. :param commonmask: if True, compute distance on the subset of point that are not masked for any of the two fields. :param mask_outside: if None, mask is empty; else, mask data outside +/- this value """ # TODO: treat more complicated cases, where mask is present but commonmask is False, # or not present but to be masked without commonmask... from bronx.syntax.arrays import stretch_array otherdata = other.data selfdata = self.data if commonmask: if not isinstance(selfdata, numpy.ma.masked_array): selfdata = numpy.ma.masked_outside(selfdata, -mask_outside, mask_outside) if not isinstance(otherdata, numpy.ma.masked_array): otherdata = numpy.ma.masked_outside(otherdata, -mask_outside, mask_outside) cmask = numpy.logical_or(selfdata.mask, otherdata.mask) selfdata.mask = cmask otherdata.mask = cmask otherdata = stretch_array(otherdata) selfdata = stretch_array(selfdata) if not commonmask and otherdata.shape != selfdata.shape: raise Exception('inconsistency between masks') r = numpy.corrcoef(selfdata, otherdata)[0,1] return r
def _check_operands(self, other): """ Internal method to check compatibility of terms in operations on fields. """ if isinstance(other, self.__class__) or isinstance(self, other.__class__): if numpy.shape(self._data) != numpy.shape(other._data): raise epygramError("dimensions mismatch.") else: try: other = float(other) except Exception: raise ValueError("operations on " + self.__class__.__name__ + " must involve either scalars " + "(integer/float) or " + self.__class__.__name__ + ".") def _add(self, other, **kwargs): """ Definition of addition, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'+'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = self._data + rhs kwargs.setdefault('fid', {'op':'+'}) newfield = footprints.proxy.field(**kwargs) newfield.setdata(result) return newfield def _mul(self, other, **kwargs): """ Definition of multiplication, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'*'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = self._data * rhs newid = {'op':'*'} newfield = footprints.proxy.field(fid=newid, **kwargs) newfield.setdata(result) return newfield def _sub(self, other, **kwargs): """ Definition of substraction, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'-'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = self._data - rhs kwargs.setdefault('fid', {'op':'-'}) newfield = footprints.proxy.field(**kwargs) newfield.setdata(result) return newfield def _rsub(self, other, **kwargs): """ Definition of reverse substraction, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'-'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = rhs - self._data newid = {'op':'-'} newfield = footprints.proxy.field(fid=newid, **kwargs) newfield.setdata(result) return newfield def _div(self, other, **kwargs): """ Definition of division, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'/'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = self._data / rhs newid = {'op':'/'} newfield = footprints.proxy.field(fid=newid, **kwargs) newfield.setdata(result) return newfield def _rdiv(self, other, **kwargs): """ Definition of reverse division, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'/'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other._data else: rhs = other result = rhs / self._data newid = {'op':'/'} newfield = footprints.proxy.field(fid=newid, **kwargs) newfield.setdata(result) return newfield # default behaviors def __add__(self, other): return self._add(other) def __mul__(self, other): return self._mul(other) def __sub__(self, other): return self._sub(other) def __div__(self, other): return self._div(other) __radd__ = __add__ __rmul__ = __mul__ def __rsub__(self, other): return self._rsub(other) def __rdiv__(self, other): return self._rdiv(other)
[docs]class FieldSet(RecursiveObject, list): """ Handles a set of Fields, in the manner of Python's builtin list, with some extra features, especially ensuring its components all are Fields. Constructor optional argument **fields** has to be either a :class:`Field` or an iterable of. """ def __init__(self, fields=()): """ Constructor. Checks that optional 'fields' argument is actually iterable and contains Field instances, or is a single Field. """ if fields == (): pass elif isinstance(fields, Field): fields = (fields,) else: try: for item in fields: if not isinstance(item, Field): raise epygramError("A FieldSet can only be made out" + " of Field instances.") except TypeError: raise epygramError("'fields' argument must be either a" + " Field instance or an iterable of fields.") except Exception: raise super(FieldSet, self).__init__(fields) def __setitem__(self, position, field): if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).__setitem__(position, field) def __setslice__(self, pos1, pos2, fieldset): if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") super(FieldSet, self).__setslice__(pos1, pos2, fieldset)
[docs] def append(self, field): """ Checks that *field* is a :class:`Field` instance before appending it. """ if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).append(field)
[docs] def index(self, fid): """ Returns the index of the first field of the FieldSet whose fid matches **fid**, **fid** being a dict. """ if not isinstance(fid, dict): raise ValueError("'fid' must be a dict.") idx = None for f in range(0, len(self)): if self[f].fid == fid: idx = f break return idx
[docs] def extend(self, fieldset): """ Checks that **fieldset** is a :class:`FieldSet` instance before extending with it. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") super(FieldSet, self).extend(fieldset)
[docs] def insert(self, position, field): """ Checks that **field** is a :class:`Field` instance before inserting it at the **position**. """ if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).insert(position, field)
[docs] def remove(self, fid): """ Removes from the FieldSet the first field whose fid matches **fid**, **fid** being a dict. """ try: idx = self.index(fid) del self[idx] except Exception: pass
[docs] def sort(self, attribute, key=None, reverse=False): """ Sorts the fields of the FieldSet by the increasing criterion. If attribute is a string, sorting will be done according to *field.attribute[key]* or *field.attribute* (if **key** is **None**). If attribute is a list *[a1, a2...]*, sorting will be done according to *field.a1.a2[key]* or *field.a1.a2* (if *key==None*). If *reverse* is *True*, sorts by decreasing order. """ if isinstance(attribute, six.string_types): if key is None: def cmpfct(x, y): cmp(x._attributes[attribute], y._attributes[attribute]) else: def cmpfct(x, y): cmp(x._attributes[attribute][key], y._attributes[attribute][key]) elif isinstance(attribute, list): a = attribute if isinstance(self[0]._attributes[a[0]], FootprintBase): if len(attribute) == 2: if key is None: def cmpfct(x, y): cmp(x._attributes[a[0]]._attributes[a[1]], y._attributes[a[0]]._attributes[a[1]]) else: def cmpfct(x, y): cmp(x._attributes[a[0]]._attributes[a[1]][key], y._attributes[a[0]]._attributes[a[1]][key]) elif len(attribute) == 3: if key is None: def cmpfct(x, y): cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]], y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]]) else: def cmpfct(x, y): cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key], y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key]) else: raise NotImplementedError("len(attribute) > 3.") else: if len(attribute) == 2: if key is None: def cmpfct(x, y): cmp(x._attributes[a[0]].__dict__[a[1]], y._attributes[a[0]].__dict__[a[1]]) else: def cmpfct(x, y): cmp(x._attributes[a[0]].__dict__[a[1]][key], y._attributes[a[0]].__dict__[a[1]][key]) elif len(attribute) == 3: if key is None: def cmpfct(x, y): cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]], y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]]) else: def cmpfct(x, y): cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key], y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key]) else: raise NotImplementedError("len(attribute) > 3.") else: raise TypeError("attribute must be a string or list of string.") super(FieldSet, self).sort(cmp=cmpfct, reverse=reverse)
[docs] def listfields(self, fidkey=None): """ Returns a list of the identifiers of the FieldSet. If *fidkey* is supplied, the list contains only **fid[*fidkey*]**, and not whole fid. """ if fidkey is None: fieldslist = [f.fid for f in self] else: fieldslist = [f.fid[fidkey] for f in self] return fieldslist
[docs] def filter(self, by, criteria): """ Not Implemented Yet. Returns a new FieldSet filtered according to criteria specified in argument. Args: - by: the kind of filter; on what to filter ? - criteria: how to filter on that ? Examples of filters: - by='id', criteria={typefmt:identifier} will return only fields whose id[typefmt] match value... """ raise NotImplementedError("not yet...")
[docs]class Resource(RecursiveObject, FootprintBase): """Generic abstract class implementing a Resource.""" _abstract = True _collector = ('epyresource',) _footprint = dict( attr=dict( openmode=dict( values=set(['r', 'read', 'w', 'write', 'a', 'append']), remap=dict( read='r', write='w', append='a'), info="Opening mode.") ) ) def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" super(Resource, self).__init__(*args, **kwargs) def __enter__(self): """Context enter.""" return self def __exit__(self, t, v, tbk): # @UnusedVariables """Context exit.""" self.close() def __del__(self): """Destructor. Closes the resource properly.""" try: self.close() except Exception as e: epylog.warning( "Exception catched in epygram.base.Resource.__del__(): " + str(e)) def __len__(self): """Returns the number of fields in resource.""" return len(self.listfields()) def __contains__(self, fid): return fid in self.listfields() def __iter__(self): """Caution: iteration returns the fields one by one, read from file !""" for f in self.listfields(): yield self.readfield(f)
[docs] def readfields(self, requestedfields, getdata=True): """ Returns a :class:`FieldSet` containing requested fields read in the resource. :param requestedfields: a field identifier of the resource format, or a list of. :param getdata: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ fieldset = FieldSet() if isinstance(requestedfields, list): for f in requestedfields: fieldset.append(self.readfield(f, getdata=getdata)) else: fieldset.append(self.readfield(requestedfields, getdata=getdata)) return fieldset
[docs] def writefields(self, fieldset): """ Write the fields of the 'fieldset' in the resource; *fieldset* must be a :class:`FieldSet` instance. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be a FieldSet " + "instance.") for field in fieldset: self.writefield(field)
[docs] def listfields(self, **kwargs): """ Returns a list containing the identifiers (in the resource format) of all the fields of the resource. (Generic wrapper with buffering if openmode == 'r'.) """ if self.openmode == 'r' and not hasattr(self, '_bufferedlistfields'): self._bufferedlistfields = [] elif self.openmode in ('w', 'a'): # list of fields subject to evolution; no buffering self._bufferedlistfields = None if self._bufferedlistfields and self._bufferedlistfieldsoptions != kwargs: self._bufferedlistfields = [] if not self._bufferedlistfields: fieldslist = self._listfields(**kwargs) self._bufferedlistfieldsoptions = copy.deepcopy(kwargs) if self._bufferedlistfields is not None: self._bufferedlistfields = fieldslist # save else: # immutable and already read fieldslist = self._bufferedlistfields return fieldslist
def _listfields(self): """ Actual listfields() method (virtual). """ raise NotImplementedError("virtual method.")
[docs] def find_fields_in_resource_by_generic_fid(self, handgrip): """ Find in resource the fields whose generic fid (if the resource is able to give one) matches the *handgrip*. """ fieldslist = self.listfields(complete=True) found = [] for f in fieldslist: if all([f['generic'][k] == handgrip[k] for k in handgrip.keys()]): found.append(f) return found
[docs]class FieldValidity(RecursiveObject): """ This class handles a unique temporal validity for a meteorological field: its date and time of validity (*date_time*), as well as the validity of its origin (*basis*, i.e. for a forecast field for instance, the beginning of the forecast) and its *term*. An additional optional *cumulativeduration* parameter can define the duration for which cumulative fields (e.g. precipitation) are valid. If such, optional *statistical_process_on_duration* can be supplied as a string or an int (cf. GRIB2 table typeOfStatisticalProcessing) to describe the kind of statistical process that runs over the *cumulativeduration*. Constructor arguments: cf. *set()* method. """ def __init__(self, date_time=None, basis=None, term=None, cumulativeduration=None, statistical_process_on_duration=None, statistical_time_increment=None): """ Constructor. :param date_time: has to be of type datetime.datetime; :param basis: has to be of type datetime.datetime; :param term: has to be of type datetime.timedelta; :param cumulativeduration: has to be of type datetime.timedelta; :param statistical_process_on_duration: kind of statistical process that runs over the cumulative duration. :param statistical_time_increment: time step over used for statistical process. """ self._basis = None self._date_time = None self._cumulativeduration = None self._statistical_process_on_duration = None self._statistical_time_increment = None kwargs = dict(date_time=date_time, basis=basis, term=term, cumulativeduration=cumulativeduration, statistical_process_on_duration=statistical_process_on_duration, statistical_time_increment=statistical_time_increment) if not (date_time is None and basis is None and term is None): self.set(**kwargs)
[docs] def term(self, fmt=None): """ This method returns the term as the difference between date and time of validity and basis. By default, it is returned as a :class:`datetime.timedelta`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now... """ if fmt is None: out = self._date_time - self._basis elif fmt == 'IntHours': term = self._date_time - self._basis out = int(term.total_seconds() // 3600) elif fmt == 'IntSeconds': term = self._date_time - self._basis out = int(term.total_seconds()) else: raise NotImplementedError("fmt=" + fmt + " option for " + self.__class__.__name__ + ".term().") return out
[docs] def cumulativeduration(self, fmt=None): """ This method returns the cumulative duration, i.e. the duration for which cumulative fields (e.g. precipitation) are valid. By default, it is returned as a :class:`datetime.timedelta`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now... """ if fmt is None: out = self._cumulativeduration elif fmt == 'IntHours': out = int(self._cumulativeduration.total_seconds() // 3600) elif fmt == 'IntSeconds': out = int(self._cumulativeduration.total_seconds()) else: raise NotImplementedError("fmt=" + fmt + " option for " + self.__class__.__name__ + ".cumulativeduration().") return out
[docs] def statistical_process_on_duration(self, asGRIB2code=False): """ If the field describes a cumulative process over a cumulativeduration, returns the kind of statistical process that runs over the duration. If *asGRIB2code*, returned as a GRIB2 code (cf. GRIB2 table 4.10). """ from epygram.extra import griberies if not asGRIB2code and isinstance(self._statistical_process_on_duration, int): out = griberies.tables.statistical_processes.get(self._statistical_process_on_duration, None) elif asGRIB2code and isinstance(self._statistical_process_on_duration, six.string_types): out = {v:k for k, v in griberies.tables.statistical_processes.items()}.get(self._statistical_process_on_duration, None) else: out = self._statistical_process_on_duration return out
[docs] def statistical_time_increment(self, fmt=None): """ This method returns the statistical_time_increment, i.e. the time step used for statistical process over cumulative duration. By default, it is returned as a :class:`datetime.timedelta`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now... """ if fmt is None: out = self._statistical_time_increment elif fmt == 'IntHours': out = int(self._statistical_time_increment.total_seconds() // 3600) elif fmt == 'IntSeconds': out = int(self._statistical_time_increment.total_seconds()) else: raise NotImplementedError("fmt=" + fmt + " option for " + self.__class__.__name__ + ".statistical_time_increment().") return out
[docs] def get(self, fmt=None): """ Returns the date and time of validity. By default, as a :class:`datetime.datetime`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s). And that's all for now... """ if fmt is None: out = self._date_time elif fmt == 'IntStr': out = '{:0>{width}}'.format(str(self._date_time.year), width=4) \ + '{:0>{width}}'.format(str(self._date_time.month), width=2) \ + '{:0>{width}}'.format(str(self._date_time.day), width=2) \ + '{:0>{width}}'.format(str(self._date_time.hour), width=2) \ + '{:0>{width}}'.format(str(self._date_time.minute), width=2) \ + '{:0>{width}}'.format(str(self._date_time.second), width=2) else: raise NotImplementedError("fmt=" + fmt + " option for " + self.__class__.__name__ + ".get().") return out
[docs] def getbasis(self, fmt=None): """ Returns the date and time of origin (basis). By default, as a :class:`datetime.datetime`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s). And that's all for now... """ if fmt is None: out = self._basis elif fmt == 'IntStr': out = '{:^{width}}'.format(str(self._basis.year), width=4) \ + '{:0>{width}}'.format(str(self._basis.month), width=2) \ + '{:0>{width}}'.format(str(self._basis.day), width=2) \ + '{:0>{width}}'.format(str(self._basis.hour), width=2) \ + '{:0>{width}}'.format(str(self._basis.minute), width=2) \ + '{:0>{width}}'.format(str(self._basis.second), width=2) else: raise NotImplementedError("fmt=" + fmt + " option for " + self.__class__.__name__ + ".getbasis().") return out
[docs] def set(self, date_time=None, basis=None, term=None, cumulativeduration=None, statistical_process_on_duration=None, statistical_time_increment=None): """ Sets validity and basis according to arguments. A consistency check is done if the three arguments are provided (which is useless anyway). Args: \n :param date_time: has to be of type :class:`datetime.datetime` :param basis: has to be of type :class:`datetime.datetime` :param term: has to be of type :class:`datetime.timedelta` :param cumulativeduration: has to be of type :class:`datetime.timedelta` :param statistical_process_on_duration: kind of statistical process that runs over the cumulative duration. Cf. GRIB2 typeOfStatisticalProcessing :param statistical_time_increment: time step over used for statistical process. """ if isinstance(date_time, datetime.datetime): self._date_time = date_time elif date_time is not None: raise epygramError("argument 'date_time' must be of type" + " datetime.datetime") if isinstance(basis, datetime.datetime): self._basis = basis elif basis is not None: raise epygramError("argument 'basis' must be of type" + " datetime.datetime") if term is not None and not isinstance(term, datetime.timedelta): raise epygramError("argument 'term' must be of type" + " datetime.timedelta") if cumulativeduration is not None and\ not isinstance(cumulativeduration, datetime.timedelta): raise epygramError("argument 'cumulativeduration' must be of" + " type datetime.timedelta") if statistical_time_increment is not None and\ not isinstance(statistical_time_increment, datetime.timedelta): raise epygramError("argument 'statistical_time_increment' must be of" + " type datetime.timedelta") if isinstance(term, datetime.timedelta): if date_time is not None and basis is not None and term is not None \ and date_time - basis != term: raise epygramError("inconsistency between 'term', 'basis'" + " and 'date_time' arguments.") if self._date_time is None: if self._basis is None: raise epygramError("cannot set 'term' without 'basis'" + " nor 'date_time'.") else: self._date_time = self._basis + term else: if self._basis is None: self._basis = self._date_time - term else: self._date_time = self._basis + term if cumulativeduration is not None: self._cumulativeduration = cumulativeduration if self._cumulativeduration is not None and \ statistical_process_on_duration is not None: self._statistical_process_on_duration = statistical_process_on_duration if statistical_time_increment is not None: self._statistical_time_increment = statistical_time_increment
[docs] def is_valid(self): """Check the validity is valid, i.e. not null.""" return self.get() is not None
[docs]class FieldValidityList(RecursiveObject, list): """ This class handles a list of temporal validity. """ def __init__(self, validity_instance=None, length=1, **kwargs): """ Constructor. - *validity_instance*, if given is an instance of FieldValidity - *length*, to build a series of validities from either the *validity_instance* or from an uninitialized one. - other kwargs: same as :class:`FieldValidity` constructor. """ list.__init__(self, []) if validity_instance is not None: if len(kwargs) != 0: raise epygramError("One can not give, at the same time, validity_instance and other argument.") failed = False if isinstance(validity_instance, FieldValidity): self.append(validity_instance) if length > 1: for _ in range(length - 1): self.append(validity_instance.deepcopy()) elif isinstance(validity_instance, FieldValidityList): self.extend(validity_instance) elif isinstance(validity_instance, list): if all([isinstance(f, FieldValidity) for f in validity_instance]): self.extend(validity_instance) else: failed = True else: failed = True if failed: raise epygramError("FieldValidityList must be built from" + " FieldValidity, from FieldValidityList" + " instances or from a list of FieldValidity.") elif kwargs != {}: # Check that all lengths are equal length = None mykwargs = {} for k, v in kwargs.items(): mykwargs[k] = [v] if not isinstance(v, list) else v if length is None or length == 1: length = len(mykwargs[k]) if len(mykwargs[k]) != length: raise epygramError("All the arguments must have the same length.") for k, v in mykwargs.items(): if len(v) == 1: mykwargs[k] = mykwargs[k] * length # We set the different objects if length is None: length = 1 self.extend([FieldValidity(**{key: value[i] for (key, value) in mykwargs.items()}) for i in range(length)]) elif isinstance(length, int): for _ in range(length): self.append(FieldValidity()) def __str__(self): strout = '<List of FieldValidity which date/time are:\n' for v in self: strout += str(v.get()) + '\n' strout += '>' return strout def __getitem__(self, key): result = super(FieldValidityList, self).__getitem__(key) if isinstance(key, slice): result = FieldValidityList(result) elif not isinstance(key, int): raise TypeError("*key* should be of type 'int' or 'slice'") return result def __getslice__(self, start, end): # deprecated but not for some builtins such as list... result = super(FieldValidityList, self).__getitem__(slice(start, end)) return FieldValidityList(result) def __eq__(self, other): if isinstance(other, self.__class__) and len(self) == len(other): return all([v == other[i] for i,v in enumerate(self)]) else: return False def __hash__(self): # known issue __eq__/__hash__ must be defined both or none, else inheritance is broken return object.__hash__(self)
[docs] def recursive_diff(self, other): """Recursively list what differs from **other**.""" if self != other: if not isinstance(other, self.__class__) or len(self) != len(other): return (str(self), str(other)) else: return [v.recursive_diff(other[i]) for i,v in enumerate(self)]
[docs] def term(self, one=True, **kwargs): """This method returns the terms of all the validities""" length = len(self) result = [self[i].term(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def cumulativeduration(self, one=True, **kwargs): """This method returns the cumulative duration of all the validities.""" length = len(self) result = [self[i].cumulativeduration(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def statistical_process_on_duration(self, one=True, **kwargs): """This method returns the statistical process on duration of all the validities.""" length = len(self) result = [self[i].statistical_process_on_duration(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def statistical_time_increment(self, one=True, **kwargs): """This method returns the statistical_time_increment of all the validities.""" length = len(self) result = [self[i].statistical_time_increment(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def get(self, one=True, **kwargs): """Returns the date and time of all the validities.""" length = len(self) result = [self[i].get(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def getbasis(self, one=True, **kwargs): """Returns the date and time of origin (basis) of all the validities.""" length = len(self) result = [self[i].getbasis(**kwargs) for i in range(length)] return result[0] if (one and length == 1) else result
[docs] def set(self, **kwargs): """Sets validity objects""" # Check that all lengths are equal length = None mykwargs = {} for k, v in kwargs.items(): mykwargs[k] = [v] if not isinstance(v, list) else v if length is None or length == 1: length = len(mykwargs[k]) if len(mykwargs[k]) != length: raise epygramError("All the arguments must have the same length.") if length == 1: length = len(self) for k, v in mykwargs.items(): if len(v) == 1: mykwargs[k] = mykwargs[k] * length # We set the different objects for i in range(length): self[i].set(**{key: value[i] for (key, value) in mykwargs.items()})
[docs] def is_valid(self): """Check the validity is valid, i.e. not null.""" return all([self[i].is_valid() for i in range(len(self))])
[docs] def what(self, out=sys.stdout, cumulativeduration=True): """ Writes in file a summary of the validity. :param out: the output open file-like object. :param cumulativeduration: if False, not written. """ out.write("################\n") out.write("### VALIDITY ###\n") out.write("################\n") write_formatted(out, "Validity", self.get()) write_formatted(out, "Basis", self.getbasis()) try: write_formatted(out, "Term", self.term()) except TypeError: pass if cumulativeduration: write_formatted(out, "Duration for cumulative quantities", self.cumulativeduration()) write_formatted(out, "Statistical process", self.statistical_process_on_duration()) write_formatted(out, "(GRIB2 code -- table 4.10)", self.statistical_process_on_duration(asGRIB2code=True)) out.write(separation_line) out.write("\n")