Source code for romancal.lib.engdb.engdb_mast

"""Access the Roman Engineering Mnemonic Database through MAST."""

import logging
from ast import literal_eval
from os import getenv

import numpy as np
import requests
from astropy.table import Table
from astropy.time import Time
from requests.adapters import HTTPAdapter, Retry

from .engdb_lib import (
    FORCE_STATUSES,
    RETRIES,
    TIMEOUT,
    EngdbABC,
    ValueCollection,
)

__all__ = ["EngdbMast"]

# Default MAST info.
MAST_BASE_URL = "https://mast.stsci.edu"
DATA_URI = "edp/api/v0.1/mnemonics/spa/roman/data"
META_URI = "edp/api/v0.1/mnemonics/spa/roman/metadata"

# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs] class EngdbMast(EngdbABC): """ Access the Roman Engineering Database through MAST. Parameters ---------- eng_base_url : str The base url for the engineering RESTful service. If not defined, the environmental variable ENG_BASE_URL is queried. Otherwise the default MAST website is used. data_uri : str or None The URI component required to retrieve the engineering data. This is concatenated to the `eng_base_url`. If not defined, the environment variable ENG_DATA_URI is queried. Otherwise, the default data URI is used. meta_uri : str or None The URI component required to retrieve the engineering metadata. This is concatenated to the `eng_base_url`. If not defined, the environment variable ENG_META_URI is queried. Otherwise, the default metadata URI is used. token : str or None The MAST access token. If not defined, the environmental variable MAST_API_TOKEN is queried. For ground operational processing, MAST_AUTH_TOKEN is also queried. For more information, see https://auth.mast.stsci.edu/ rsdp_auth : bool True to use RSDP authentication. check_aliveness : boolean Check if the service is actually reachable. **service_kwargs : dict Service-specific keyword arguments that are not relevant to this implementation of EngdbABC. Raises ------ RuntimeError Any and all failures with connecting with the MAST server. """ #: The base URL for the engineering service. eng_base_url = None #: The end time of the last query. endtime = None #: The results of the last query. response = None #: Number of retries to attempt to contact the service retries = RETRIES #: The start time of the last query. starttime = None #: Network timeout when communicating with the service timeout = TIMEOUT #: MAST Token token = None #: Use RSDP authentication rsdp_auth = False #: DATA endpoint URI data_uri = None #: METADATA endpoint URI meta_uri = None def __init__( self, eng_base_url=None, data_uri=None, meta_uri=None, token=None, rsdp_auth=False, check_aliveness=True, **service_kwargs, ): logger.debug("kwargs not used by this service: %s", service_kwargs) self._configure( eng_base_url=eng_base_url, data_uri=data_uri, meta_uri=meta_uri, token=token, rsdp_auth=rsdp_auth, ) self._set_session() if check_aliveness: self.isalive()
[docs] def get_meta(self, search=None): """ Get the mnemonics meta info. Parameters ---------- search : str or None A partial, or full, mnemonic specification. If None, meta for all available mnemonics are returned Returns ------- meta : dict The meta information. The primary keys are "Count" and "TlmMnemonics". The count is the number of mnemonics returned and TlmMnemonics is a list of dicts containing information for each mnemonic. """ # Make the request if search is None: self._metareq.params = {} else: self._metareq.params = { "mnemonic": search, } prepped = self._session.prepare_request(self._metareq) settings = self._session.merge_environment_settings( prepped.url, {}, None, None, None ) logger.debug("Query: %s", prepped.url) self.metaresponse = self._session.send( prepped, timeout=self.timeout, **settings ) self.metaresponse.raise_for_status() logger.debug("Response: %s", self.metaresponse) logger.debug("Response test: %s", self.metaresponse.text) # Leave as dictionary. results = literal_eval(self.metaresponse.text) return results
[docs] def get_values( self, mnemonic, starttime, endtime, time_format=None, include_obstime=False, include_bracket_values=False, zip_results=True, ): """ Retrieve all results for a mnemonic in the requested time range. Parameters ---------- mnemonic : str The engineering mnemonic to retrieve starttime : str or `astropy.time.Time` The, inclusive, start time to retrieve from. endtime : str or `astropy.time.Time` The, inclusive, end time to retrieve from. time_format : str The format of the input time used if the input times are strings. If None, a guess is made. include_obstime : bool If `True`, the return values will include observation time as `astropy.time.Time`. See `zip_results` for further details. include_bracket_values : bool The DB service, by default, returns the bracketing values outside of the requested time. If `True`, include these values. zip_results : bool If `True` and `include_obstime` is `True`, the return values will be a list of 2-tuples. If false, the return will be a single 2-tuple, where each element is a list. Returns ------- values : [value, ...] or [(obstime, value), ...] or ([obstime,...], [value, ...]) Returns the list of values. See `include_obstime` and `zip` for modifications. """ if not isinstance(starttime, Time): starttime = Time(starttime, format=time_format) if not isinstance(endtime, Time): endtime = Time(endtime, format=time_format) records = self._get_records( mnemonic=mnemonic, starttime=starttime, endtime=endtime, time_format=time_format, ) if len(records) == 0: return list() # If desired, remove bracket or outside of timeframe entries. if not include_bracket_values: selection = np.logical_and( records["MJD"] >= starttime.mjd, records["MJD"] <= endtime.mjd ) records = records[selection] # Reformat to the desired list formatting. results = ValueCollection( include_obstime=include_obstime, zip_results=zip_results ) values = records["EUValue"] obstimes = Time(records["MJD"], format="mjd") for obstime, value in zip(obstimes, values, strict=False): results.append(obstime, value) return results.collection
[docs] def isalive(self): """Check that the database is reachable""" try: self.get_meta(search="engdb_mastaliveness") except ( requests.exceptions.ConnectionError, requests.exceptions.HTTPError, ) as exception: raise RuntimeError( f"MAST url: {self.eng_base_url} is unreachable." ) from exception
def _configure( self, eng_base_url=None, data_uri=None, meta_uri=None, token=None, rsdp_auth=False, ): """ Configure from parameters and environment. Parameters ---------- eng_base_url : str or None The base url for the engineering RESTful service. If not defined, the environmental variable ENG_BASE_URL is queried. Otherwise the default MAST website is used. data_uri : str or None The URI component required to retrieve the engineering data. This is concatenated to the `eng_base_url`. If not defined, the environment variable ENG_DATA_URI is queried. Otherwise, the default data URI is used. meta_uri : str or None The URI component required to retrieve the engineering metadata. This is concatenated to the `eng_base_url`. If not defined, the environment variable ENG_META_URI is queried. Otherwise, the default metadata URI is used. token : str or None The MAST access token. If not defined, the environmental variable MAST_API_TOKEN is queried. For ground operations, MAST_AUTH_TOKEN is also queried. For more information, see 'https://auth.mast.stsci.edu/' rsdp_auth : bool Use RSDP authentication """ # Determine database access. if eng_base_url is None: eng_base_url = getenv("ENG_BASE_URL", MAST_BASE_URL) if eng_base_url[-1] != "/": eng_base_url += "/" self.eng_base_url = eng_base_url if data_uri is None: data_uri = getenv("ENG_DATA_URI", DATA_URI) self.data_uri = data_uri if meta_uri is None: meta_uri = getenv("ENG_META_URI", META_URI) self.meta_uri = meta_uri # Get the token self.rsdp_auth = rsdp_auth if token is None: token = getenv("MAST_AUTH_TOKEN", None) self.rsdp_auth = token is not None if token is None: token = getenv("MAST_API_TOKEN", None) self.token = token # Get various timeout parameters self.retries = int(getenv("ENG_RETRIES", RETRIES)) self.timeout = int(getenv("ENG_TIMEOUT", TIMEOUT)) def _get_records( self, mnemonic, starttime, endtime, time_format=None, **other_kwargs ): """ Retrieve all results for a mnemonic in the requested time range. Parameters ---------- mnemonic : str The engineering mnemonic to retrieve starttime : str or astropy.time.Time The, inclusive, start time to retrieve from. endtime : str or astropy.time.Time The, inclusive, end time to retrieve from. time_format : str The format of the input time used if the input times are strings. If None, a guess is made. **other_kwargs : dict Keyword arguments not relevant to this implementation. Returns ------- records : `astropy.Table` Returns the resulting table. Notes ----- The engineering service always returns the bracketing entries before and after the requested time range. """ if not isinstance(starttime, Time): starttime = Time(starttime, format=time_format) if not isinstance(endtime, Time): endtime = Time(endtime, format=time_format) self.starttime = starttime self.endtime = endtime # Make the request mnemonic = mnemonic.strip() mnemonic = mnemonic.upper() starttime_fmt = starttime.strftime("%Y-%m-%dT%H:%M:%S") endtime_fmt = endtime.strftime("%Y-%m-%dT%H:%M:%S") self._datareq.params = { "mnemonic": mnemonic, "s_time": starttime_fmt, "e_time": endtime_fmt, } prepped = self._session.prepare_request(self._datareq) settings = self._session.merge_environment_settings( prepped.url, {}, None, None, None ) logger.debug("Query: %s", prepped.url) self.response = self._session.send(prepped, timeout=self.timeout, **settings) self.response.raise_for_status() logger.debug("Response: %s", self.response) logger.debug("Response test: %s", self.response.text) # Check for explicit errors from the service. response = literal_eval(self.response.text) if (detail := response.get("detail")) is not None: raise ValueError(f"Failure retrieving mnemonic {mnemonic}: {detail}") # Convert to table. data = response["Data"] del response["Data"] table = Table(rows=data, meta=response) # Create a column MJD that has the MJD version of the data if len(table): obstime = Time(table["ObsTime"]) table["MJD"] = obstime.mjd return table def _set_session(self): """Set up HTTP session.""" headers = dict() if self.token: if self.rsdp_auth: headers["x-asb-auth"] = self.token else: headers["Authorization"] = f"token {self.token}" self._datareq = requests.Request( method="GET", url=self.eng_base_url + self.data_uri, headers=headers, ) self._metareq = requests.Request( method="GET", url=self.eng_base_url + self.meta_uri, headers=headers, ) s = requests.Session() retries = Retry( total=self.retries, backoff_factor=1.0, status_forcelist=FORCE_STATUSES, raise_on_status=True, ) s.mount("https://", HTTPAdapter(max_retries=retries)) self._session = s def __repr__(self): """What am I""" repr = f"{self.__class__.__name__}(eng_base_url='{self.eng_base_url}')" return repr