Source code for specprodDB.tile

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
specprodDB.tile
===============

Code for loading one or more separate tiles into the spectroscopic production database.

Notes
-----

Some of this code may be combined or otherwise refactored with :mod:`specprodDB.load`
in the future.
"""
import os
from configparser import ConfigParser

import numpy as np

from astropy.table import Table, join

from sqlalchemy.exc import IntegrityError

from desiutil.iers import freeze_iers
# from desiutil.names import radec_to_desiname
from desiutil.log import get_logger, DEBUG, INFO

from desispec.io.meta import findfile
from desispec.io.photo import gather_targetphot, gather_tractorphot
from desispec.scripts.zcatalog import read_redrock
from desispec.zcatalog import find_primary_spectra

from . import load as db
from .util import no_sky, common_options


_fiberassign_cache = dict()


[docs] def fiberassign_file(tileid): """Find a fiberassign file associated with `tileid`. Parameters ---------- tileid : :class:`int` The Tile ID. Returns ------- :class:`str` The path to the fiberassign file corresponding to tileid. """ global _fiberassign_cache if tileid not in _fiberassign_cache: f = findfile('fiberassignsvn', tile=tileid, readonly=True) _fiberassign_cache[tileid] = f return _fiberassign_cache[tileid]
[docs] def potential_targets(tileid): """Find potential targets associated with `tileid`. Sky targets are not returned. Parameters ---------- tileid : :class:`int` The Tile ID. Returns ------- :class:`~astropy.table.Table` A table containing potential target information. """ potential_targets_table = Table.read(fiberassign_file(tileid), format='fits', hdu='TARGETS') db.log.debug("Found %d potential targets.", len(potential_targets_table)) no_sky_rows = no_sky(potential_targets_table) potential_targets_table = Table(potential_targets_table[no_sky_rows]) db.log.debug("%d potential targets remain after removing sky targets.", len(potential_targets_table)) return potential_targets_table
[docs] def potential_photometry(tile, targets): """Assemble a Table of targets that will be used to find photometric data. `targets` is assumed to come from one tile that has not already been loaded. Any existing photometry already loaded will be excluded from the returned list. Parameters ---------- tile : :class:`~specprodDB.load.Tile` The tile associated with `targets`. targets : :class:`~astropy.table.Table` Effectively a list of ``TARGETID``. Returns ------- :class:`~astropy.table.Table` A Table that will be the input to photometric search functions. """ db.log.debug("Checking for existing photometry.") potential_tractorphot_already_loaded = db.dbSession.query(db.Photometry.targetid).filter(db.Photometry.targetid.in_(targets['TARGETID'].tolist())).all() potential_tractorphot_not_already_loaded = np.ones((len(targets),), dtype=bool) if len(potential_tractorphot_already_loaded) > 0: db.log.info("Removing %d objects already loaded.", len(potential_tractorphot_already_loaded)) for row in potential_tractorphot_already_loaded: potential_tractorphot_not_already_loaded[targets['TARGETID'] == row[0]] = False potential_cat = Table() potential_cat['TARGETID'] = targets['TARGETID'][potential_tractorphot_not_already_loaded] potential_cat['TILEID'] = tile.tileid potential_cat['TARGET_RA'] = targets['RA'][potential_tractorphot_not_already_loaded] potential_cat['TARGET_DEC'] = targets['DEC'][potential_tractorphot_not_already_loaded] # potential_cat['PETAL_LOC'] = targets['PETAL_LOC'][potential_tractorphot_not_already_loaded] potential_cat['SURVEY'] = tile.survey potential_cat['PROGRAM'] = tile.program return potential_cat
[docs] def targetphot(catalog): """Find the target data associated with the targets in `catalog`. Parameters ---------- catalog : :class:`~astropy.table.Table` A list of objects. Returns ------- :class:`~astropy.table.Table` A Table containing the targeting data. """ db.log.debug("Starting gather_targetphot(); %d objects in input catalog.", len(catalog)) potential_targetphot = gather_targetphot(catalog, racolumn='TARGET_RA', deccolumn='TARGET_DEC') db.log.debug("Finished with gather_targetphot(); %d objects found.", len(potential_targetphot)) potential_targetphot['SURVEY'] = catalog['SURVEY'] potential_targetphot['PROGRAM'] = catalog['PROGRAM'] potential_targetphot['TILEID'] = catalog['TILEID'] inan = np.logical_or(np.isnan(potential_targetphot['PMRA']), np.isnan(potential_targetphot['PMDEC'])) if np.any(inan): potential_targetphot['PMRA'][inan] = 0.0 potential_targetphot['PMDEC'][inan] = 0.0 return potential_targetphot
[docs] def tractorphot(catalog): """Find the photometry data associated with the targets in `catalog`. Parameters ---------- catalog : :class:`~astropy.table.Table` A list of objects. Returns ------- :class:`~astropy.table.Table` A Table containing the photometry data. """ db.log.debug("Starting gather_tractorphot(); %d objects in input catalog.", len(catalog)) potential_tractorphot = gather_tractorphot(catalog, racolumn='TARGET_RA', deccolumn='TARGET_DEC') db.log.debug("Finished with gather_tractorphot(); %d objects found.", len(potential_tractorphot)) assert (np.where(potential_tractorphot['RELEASE'] == 0)[0] == np.where(potential_tractorphot['BRICKNAME'] == '')[0]).all() return potential_tractorphot
[docs] def load_photometry(photometry): """Insert the data in `photometry` into the database. Parameters ---------- photometry : :class:`~astropy.table.Table` A Table containing the photometry data. Returns ------- :class:`list` A list of :class:`~specprodDB.load.Photometry` objects loaded. """ row_index = np.where(photometry['BRICKNAME'] != '')[0] load_photometry = db.Photometry.convert(photometry, row_index=row_index) if len(load_photometry) > 0: # db.dbSession.rollback() db.dbSession.add_all(load_photometry) db.dbSession.commit() db.log.info("Loaded %d rows of Photometry data.", len(load_photometry)) else: db.log.info("No Photometry data to load.") return load_photometry
[docs] def load_targetphot(targetphot, loaded_photometry): """Load the photometry, such as it is, for objects that do not have Tractor photometry. Parameters ---------- targetphot : :class:`~astropy.table.Table` A Table containing the targeting data. loaded_photometry : :class:`list` A list of :class:`~specprodDB.load.Photometry` objects already loaded. Returns ------- :class:`list` A list of :class:`~specprodDB.load.Photometry` objects loaded. """ # # Find TARGETID not already *just* loaded. # load_rows = np.zeros((len(targetphot),), dtype=bool) if len(loaded_photometry) > 0: loaded_targetid = Table() loaded_targetid['TARGETID'] = np.array([r.targetid for r in loaded_photometry]) loaded_targetid['LS_ID'] = np.array([r.ls_id for r in loaded_photometry]) j = join(targetphot['TARGETID', 'RELEASE'], loaded_targetid, join_type='left', keys='TARGETID') try: load_targetids = j['TARGETID'][j['LS_ID'].mask] except AttributeError: # # This means *every* TARGETID is already loaded. # pass else: unique_targetid, targetid_index = np.unique(targetphot['TARGETID'].data, return_index=True) for t in load_targetids: load_rows[targetid_index[unique_targetid == t]] = True # # Find TARGETID not loaded in previous cycles. # targetphot_already_loaded = db.dbSession.query(db.Photometry.targetid).filter(db.Photometry.targetid.in_(targetphot[load_rows]['TARGETID'].tolist())).all() targetphot_not_already_loaded = np.ones((len(targetphot),), dtype=bool) for row in targetphot_already_loaded: targetphot_not_already_loaded[targetphot['TARGETID'] == row[0]] = False row_index = np.where(load_rows & targetphot_not_already_loaded)[0] load_targetphot = db.Photometry.convert(targetphot, row_index=row_index) if len(load_targetphot) > 0: # db.dbSession.rollback() db.dbSession.add_all(load_targetphot) db.dbSession.commit() db.log.info("Loaded %d rows of Photometry data (from targeting).", len(load_targetphot)) else: db.log.info("No Photometry data (from targeting) to load.") return load_targetphot
[docs] def load_target(tile, target): """Load the targeting data associated with `tile`. Parameters ---------- tile : :class:`~specprodDB.load.Tile` The tile associated with `target`. target : :class:`~astropy.table.Table` Effectively a list of ``TARGETID``. Returns ------- :class:`list` A list of :class:`~specprodDB.load.Target` objects loaded. """ load_target = db.Target.convert(target, tile.survey, tile.tileid) if len(load_target) > 0: # db.dbSession.rollback() db.dbSession.add_all(load_target) db.dbSession.commit() db.log.info("Loaded %d rows of Target data.", len(load_target)) else: db.log.info("No Target data to load.") return load_target
[docs] def load_redshift(tile, spgrp='cumulative'): """Load redshift data associated with `tile`. Parameters ---------- tile : :class:`~specprodDB.load.Tile` The tile with redshifts to load. spgrp : :class:`str`, optional The type of redshift data to load. Currently only 'cumulative' is supported. Returns ------- :class:`list` A list of :class:`~specprodDB.load.Ztile` objects loaded. Raises ------ ValueError If the value of `spgrp` is not supported. """ if spgrp not in ('cumulative',): msg = 'Unsupported spgrp value: "%s"!' db.log.critical(msg, spgrp) raise ValueError(msg % (spgrp,)) redrock_files = list() for spectrograph in range(10): redrock_file, redrock_exists = findfile('redrock_tile', groupname=spgrp, tile=tile.tileid, spectrograph=spectrograph, night=tile.lastnight, readonly=True, return_exists=True) if redrock_exists: redrock_files.append(redrock_file) else: zbest_file = redrock_file.replace('redrock', 'zbest') if os.path.exists(zbest_file): db.log.info('Using %s instead of %s.', os.path.basename(zbest_file), os.path.basename(redrock_file)) redrock_files.append(zbest_file) if len(redrock_files) == 0: db.log.warning("No %s redrock or zbest files found for tile %d!", spgrp, tile.tileid) return [] load_ztile = list() for rr in redrock_files: redrock_table, expfibermap = read_redrock(rr, group=spgrp, recoadd_fibermap=True, pertile=True) assert (expfibermap['TILEID'] == tile.tileid).all() # # In non-daily specprod, firstnight is a minimum over all petals. # However here, we are doing a minimum over one petal. # Compare the FIRSTNIGHT calculation in desispec.scripts.zcatalog. # firstnight = np.min(expfibermap['NIGHT']).tolist() row_index = no_sky(redrock_table) load_ztile += db.Ztile.convert(redrock_table, tile.survey, tile.program, tile.tileid, firstnight, row_index=row_index) if len(load_ztile) > 0: statement = db.upsert(load_ztile) db.dbSession.execute(statement) # db.dbSession.add_all(load_ztile) db.dbSession.commit() db.log.info("Loaded %d rows of Ztile data.", len(load_ztile)) else: db.log.info("No Ztile data to load.") return load_ztile
[docs] def load_fiberassign(tile): """Load the fiber assignments and potential assignments for `tile`. Parameters ---------- tile : :class:`~specprodDB.load.Tile` The tile with fiber assignments to load. Returns ------- :class:`tuple` A tuple containing the lists of :class:`~specprodDB.load.Fiberassign` and :class:`~specprodDB.load.Potential` objects loaded. """ fiberassign_table = Table.read(fiberassign_file(tile.tileid), format='fits', hdu='FIBERASSIGN') potential_table = Table.read(fiberassign_file(tile.tileid), format='fits', hdu='POTENTIAL_ASSIGNMENTS') row_index = no_sky(fiberassign_table) load_fiberassign = db.Fiberassign.convert(fiberassign_table, tile.tileid, row_index=row_index) if len(load_fiberassign) > 0: db.dbSession.add_all(load_fiberassign) db.dbSession.commit() db.log.info("Loaded %d rows of Fiberassign data.", len(load_fiberassign)) else: db.log.info("No Fiberassign data to load.") row_index = no_sky(potential_table) load_potential = db.Potential.convert(potential_table, tile.tileid, row_index=row_index) if len(load_potential) > 0: db.dbSession.add_all(load_potential) db.dbSession.commit() db.log.info("Loaded %d rows of Potential data.", len(load_potential)) else: db.log.info("No Potential data to load.") return (load_fiberassign, load_potential)
[docs] def update_primary(): """Update the primary classification after some number of tiles has been loaded. """ zall_tilecumulative = db.dbSession.query(db.Ztile).all() zall_table = Table(list(zip(*[(z.targetid, z.zwarn, z.tsnr2_lrg) for z in zall_tilecumulative])), names=('TARGETID', 'ZWARN', 'TSNR2_LRG')) nspec, primary = find_primary_spectra(zall_table) zcat_nspec, zcat_primary = nspec.tolist(), primary.tolist() for k, z in enumerate(zall_tilecumulative): z.zcat_nspec = zcat_nspec[k] z.zcat_primary = zcat_primary[k] db.dbSession.commit() db.log.info("Updated primary classification for %d Ztile objects.", len(zall_tilecumulative)) sv_tilecumulative = db.dbSession.query(db.Ztile).filter(db.Ztile.survey.in_(('sv1', 'sv2', 'sv3'))).all() if len(sv_tilecumulative) > 0: sv_table = Table(list(zip(*[(z.targetid, z.zwarn, z.tsnr2_lrg) for z in sv_tilecumulative])), names=('TARGETID', 'ZWARN', 'TSNR2_LRG')) nspec, primary = find_primary_spectra(sv_table) sv_nspec, sv_primary = nspec.tolist(), primary.tolist() for k, z in enumerate(sv_tilecumulative): z.sv_nspec = sv_nspec[k] z.sv_primary = sv_primary[k] db.dbSession.commit() db.log.info("Updated primary classification for %d SV Ztile objects.", len(sv_tilecumulative)) else: db.log.info("No SV Ztile objects to update.") main_tilecumulative = db.dbSession.query(db.Ztile).filter(db.Ztile.survey.in_(('main', ))).all() if len(main_tilecumulative) > 0: main_table = Table(list(zip(*[(z.targetid, z.zwarn, z.tsnr2_lrg) for z in main_tilecumulative])), names=('TARGETID', 'ZWARN', 'TSNR2_LRG')) nspec, primary = find_primary_spectra(main_table) main_nspec, main_primary = nspec.tolist(), primary.tolist() for k, z in enumerate(main_tilecumulative): z.main_nspec = main_nspec[k] z.main_primary = main_primary[k] db.dbSession.commit() db.log.info("Updated primary classification for %d Main Ztile objects.", len(main_tilecumulative)) else: db.log.info("No Main Ztile objects to update.") return
[docs] def update_q3c(): """Update the q3c indexes after some number of tiles has been loaded. """ q3c_updates = {'tile': 'tilera', 'exposure': 'tilera', 'photometry': 'ra', 'fiberassign': 'target_ra'} for table in q3c_updates: db.q3c_index(table, ra=q3c_updates[table]) return
[docs] def get_options(description="Load data for one tile into a specprod database."): """Parse command-line options. Parameters ---------- description : :class:`str`, optional Override the description in the command-line help. Returns ------- :class:`argparse.Namespace` The parsed options. """ prsr = common_options(description) prsr.add_argument('-e', '--exposures-file', action='store', dest='exposures_file', metavar='FILE', help='Override the top-level exposures file associated with a specprod.') prsr.add_argument('-p', '--primary', action='store_true', dest='primary', help='Update primary redshift values and indexes for all tiles.') prsr.add_argument('-t', '--tiles-file', action='store', dest='tiles_file', metavar='FILE', help='Override the top-level tiles file associated with a specprod.') prsr.add_argument('-u', '--update', action='store_true', dest='update', help='Specify that this is an update to an already-loaded tile.') prsr.add_argument('tile', metavar='TILEID', type=int, help='Load TILEID.') options = prsr.parse_args() return options
[docs] def main(): """Entry point for command-line script. Returns ------- :class:`int` An integer suitable for passing to :func:`sys.exit`. """ # # command-line arguments. # options = get_options("Load data for one tile into a specprod database.") # # Logging. # log_level = DEBUG if options.verbose else INFO db.log = get_logger(log_level, timestamp=True) # # Cache specprod value. # try: specprod = os.environ['SPECPROD'] except KeyError: db.log.critical("Environment variable SPECPROD is not defined!") return 1 # # Read configuration file. # config = ConfigParser() r = config.read(options.config) if not (r and r[0] == options.config): db.log.critical("Failed to read configuration file: %s!", options.config) return 1 if specprod not in config: db.log.critical("Configuration has no section for '%s'!", specprod) return 1 # # Initialize DB. # freeze_iers() postgresql = db.setup_db(hostname=config[specprod]['hostname'], username=config[specprod]['username'], schema=options.schema, overwrite=options.overwrite, public=options.public, verbose=options.verbose) # # Load configuration. # release = config[specprod]['release'] photometry_version = config[specprod]['photometry'] # target_summary = config[specprod].getboolean('target_summary') rsv = config[specprod]['redshift'].split('/') if len(rsv) == 2: redshift_type, redshift_version = rsv[0], rsv[1] else: redshift_type, redshift_version = rsv[0], 'v0' tiles_version = config[specprod]['tiles'] # # Complete initialization. # if options.overwrite: db.load_versions(photometry_version, f"{redshift_type}/{redshift_version}", release, specprod, tiles_version) # # Find the tile in the top-level tiles file. # if options.tiles_file is None: tiles_file = findfile('tiles', readonly=True) else: tiles_file = options.tiles_file tiles_table = Table.read(tiles_file, format='fits', hdu='TILES') row_index = np.where(tiles_table['TILEID'] == options.tile)[0] if len(row_index) == 1: candidate_tiles = db.Tile.convert(tiles_table, row_index=row_index) elif len(row_index) > 1: db.log.critical("Multiple matching tiles found in %s for tile %d!", tiles_file, options.tile) return 1 else: db.log.critical("No matching tiles found in %s for tile %d!", tiles_file, options.tile) return 1 # # Find the associated exposures. # if options.exposures_file is None: exposures_file = findfile('exposures', readonly=True) else: exposures_file = options.exposures_file exposures_table = Table.read(exposures_file, format='fits', hdu='EXPOSURES') row_index = np.where((exposures_table['TILEID'] == candidate_tiles[0].tileid) & (exposures_table['EFFTIME_SPEC'] > 0))[0] if len(row_index) > 0: load_exposures = db.Exposure.convert(exposures_table, row_index=row_index) else: db.log.critical("No valid exposures found for tile %d, even though EFFTIME_SPEC == %f!", candidate_tiles[0].tileid, candidate_tiles[0].efftime_spec) return 1 frames_table = Table.read(exposures_file, format='fits', hdu='FRAMES') load_frames = list() for exposure in load_exposures: row_index = np.where(frames_table['EXPID'] == exposure.expid)[0] assert len(row_index) > 0 load_frames += db.Frame.convert(frames_table, row_index=row_index) try: statement = db.upsert(candidate_tiles) db.dbSession.execute(statement) # db.dbSession.add_all(candidate_tiles) db.dbSession.commit() except IntegrityError as exc: # # IntegrityError is thrown when a tile is already loaded, but also when # a NOT NULL constraint is violated. # db.log.critical("Tile %d cannot be loaded!", candidate_tiles[0].tileid) db.log.critical("Message was: %s", exc.args[0]) db.dbSession.rollback() return 1 new_tile = candidate_tiles[0] try: statement = db.upsert(load_exposures) db.dbSession.execute(statement) # db.dbSession.add_all(load_exposures) db.dbSession.commit() except IntegrityError as exc: db.log.critical("Exposures for tile %d cannot be loaded!", new_tile.tileid) db.log.critical("Message was: %s", exc.args[0]) db.dbSession.rollback() db.dbSession.delete(new_tile) db.dbSession.commit() return 1 statement = db.upsert(load_frames) db.dbSession.execute(statement) # db.dbSession.add_all(load_frames) db.dbSession.commit() # # Load photometry. If this is an update, these should already be loaded. # if not options.update: potential_targets_table = potential_targets(new_tile.tileid) potential_cat = potential_photometry(new_tile, potential_targets_table) potential_targetphot = targetphot(potential_cat) potential_tractorphot = tractorphot(potential_cat) loaded_photometry = load_photometry(potential_tractorphot) loaded_targetphot = load_targetphot(potential_targetphot, loaded_photometry) # # Load targeting table. # loaded_target = load_target(new_tile, potential_targetphot) # # Load fiberassign and potential. # loaded_fiberassign, loaded_potential = load_fiberassign(new_tile) # # Load tile/cumulative redshifts. # loaded_ztile = load_redshift(new_tile) # # Update global values, if requested. # if options.primary: update_primary() update_q3c() # # Clean up. # db.dbSession.close() db.engine.dispose() return 0