Source code for scripts.extract.obs.bdquery

# -*- coding: utf-8 -*-

import psycopg2
from snowtools.utils.dates import check_and_convert_date


[docs] class question(object): """ Class for interrogating PostgreSQL database Useful for accessing BDClim at Meteo-France. Please consider using directly ``psycopg2`` or ``pandas`` for new projects and make sure you are familiar with SQL syntax before using as inputs are not always correctly sanitized. Usecase example: Read all snow depth information for poste nr 38005402 (Nivose Aigleton) and 38567002 (Chamrousse) for the season 2020-2021 and save it to a file: .. code:: python question1 = question( # Variables to extract: here date, num_poste on 8 digits, poste name, snow depth listvar=["to_char(dat,'YYYY-MM-DD-HH24-MI')", "to_char(h.num_poste, 'fm00000000')", "poste_nivo.nom_usuel", "neigetot"], # The table to read (see BDClim documentation) table='H', # Join table num_poste to get the poste name listjoin=['POSTE_NIVO ON H.NUM_POSTE = POSTE_NIVO.NUM_POSTE'], # Set a time period of interest (datetime or str objects) period=['2020-08-01', '2021-08-01'], # Set the order listorder=['dat', 'h.num_poste'], # Filtering # parenthesis compulsory to group with "OR" listconditions=['(H.NUM_POSTE=38005402 OR H.NUM_POSTE=38567002)'], ) # Save into a csv file question1.run(outputfile='output.csv', header=['dat', 'num_poste', 'poste', 'neigetot']) """ def __init__(self, host='bdclimop-usr.meteo.fr', port=5432, user='anonymous', password='anonymous', database='bdclim', listvar=[], listorder=[], listconditions=[], listjoin=[], period=[], table=None, dateformat='iso'): """ :param host: Database server hostname :type host: str :param port: Database server port :type port: int :param user: Database username :type user: str :param password: Database password :type password: str :param database: Database name to use on the server :type database: str :param listvar: List of variables to read :type listvar: list of str :param listorder: List of variables to order on :type listorder: list of str :param listconditions: List of conditions to apply for data selection (SQL format) :type listconditions: list of str :param listjoin: List of join to be done (SQL format without 'JOIN') :type join: list of str :param period: List [[Min date, [max_date]]] for filtering on dates (field dat, else, put it in conditions please) :type period: list of str or datetime :param table: The table to read in the selected database :type table: str :param dateformat: Date format (iso, daily or monthly) :type dateformat: str """ if listconditions is None: listconditions = [] self.host = host self.port = port self.user = user self.password = password self.database = database self.variables = listvar self.order = listorder self.conditions = listconditions self.join = listjoin self.period = period self.table = table self.dateformat = dateformat self.sql = None
[docs] def set_varout(self, listvar): """ Set the list of desired output variables """ self.variables = listvar
[docs] def set_order(self, listvar): """ Set the list of sorting variables """ if isinstance(listvar, list): self.order = listvar else: self.order.append(listvar)
[docs] def set_table(self, table): """ Define the table """ self.table = table
[docs] def set_period(self, datedeb, datefin): """ Define the period with begin and end date :param datedeb: Begin date :type datedeb: str or datetime object :param datefin: End date :type datefin: str or datetime object """ self.period = [datedeb, datefin]
[docs] def set_condition(self, condition): """ Set a condition, written as a SQL condition """ self.conditions.append(condition)
[docs] def set_join(self, join_str): """ Set a join of table, written as a SQL JOIN. Example: ``text ON text.id = id_text`` """ self.join.append(join_str)
def _format_date(self, date): if self.dateformat == 'iso': return date.isoformat(sep=' ') elif self.dateformat == 'daily': return date.strftime('%Y%m%d') elif self.dateformat == 'monthly': return date.strftime('%Y%m') else: raise ValueError('Date format must be one of iso, daily, monthly')
[docs] def get(self, outputfile=None, sep=';', header=True, mode='w'): """ Run the query to the database and return an array of results (or write the results in a csv file). Note that the sql query is written in ``sql`` attribute of the object. :param outputfile: Output csv file in which to write the results. If set to None, an array of results is returned :type outputfile: str or path-like :param sep: Separator in case of csv output is specified :type sep: one-character str :param header: Headers in case of CSV file. Defaults to True (put the headers), can also be False to desactivate headers or a list to override default ones :type header: bool or list :param mode: Mode for opening outputfile :type mode: str :returns: array of results (or nothing in case outputfile is set) """ # construction of SQL request self.sql = 'SELECT {fields} FROM {table}'.format(fields=','.join(self.variables), table=self.table) if len(self.join) > 0: self.sql += ' JOIN {}'.format(' JOIN '.join(self.join)) if len(self.conditions) > 0 or len(self.period) > 0: conditions = self.conditions if len(self.period) > 0: if isinstance(self.period[0], str): date = check_and_convert_date(self.period[0]) else: date = self.period[0] date_formatted = self._format_date(date) conditions += [f'{self.table}.DAT >= \'{date_formatted}\''] if len(self.period) > 1: if isinstance(self.period[1], str): date = check_and_convert_date(self.period[1]) else: date = self.period[1] date_formatted = self._format_date(date) conditions += [f'{self.table}.DAT <= \'{date_formatted}\''] self.sql += ' WHERE {}'.format(' AND '.join(self.conditions)) if len(self.order) > 0: self.sql += ' ORDER BY {}'.format(','.join(self.order)) print('Lauching the following querry : \n', self.sql) # Connexion to database with psycopg2.connect( host=self.host, user=self.user, password=self.password, port=self.port, database=self.database ) as conn: cursor = conn.cursor() cursor.execute(self.sql) if outputfile is None: data = cursor.fetchall() return data else: import csv with open(outputfile, mode, newline='') as f: writer = csv.writer(f, delimiter=sep) if header is True: writer.writerow(self.variables) elif header is not False: writer.writerow(header) writer.writerows(cursor)
[docs] def run(self, *args, **kwargs): """ For backward compatibility only. Please do not use. meta:private """ return self.get(*args, **kwargs)