Source code for scopesim.source.source

"""
# old functionality to implement:
# - provide x, y, lam, spectra, weight, ref
# - overridden + : number, Source, SourceSpectrum
# - overridden * : number, SpectralElement
# - write to and read from file
# - shift all fields
# - rotate around the centre
# - photons_in_range returns the photons per spectrum in a wavelength range
# - image_in_range returns an image of the source for a wavelength range
#
# old functionality which will be removed:
# - project_onto_chip
# - apply_optical_train
#
# old structure --> new structure:
# - all data held in 6 arrays
# --> new dicts for fields, spectrum
#       field can be a Table or an ImageHDU
#       spectrum is a SourceSpectrum
#
# Use cases:
# image + spectrum
# images + spectra
# table + spectrum
# table + spectra
#
# table columns = x, y, spec_id, weight
# table meta keywords = x_unit, y_unit
#
# image header keywords = WCS, SPEC_ID, WEIGHT
# [WCS = CRPIXn, CRVALn = (0,0), CTYPEn, CDn_m, NAXISn, CUNITn
"""

import pickle
from copy import deepcopy
from pathlib import Path
import numpy as np

from astropy.table import Table, Column
from astropy.io import ascii as ioascii
from astropy.io import fits
from astropy import units as u
from astropy.wcs import WCS

from synphot import SpectralElement

from ..optics.image_plane import ImagePlane
from ..optics import image_plane_utils as imp_utils
from .source_utils import validate_source_input, convert_to_list_of_spectra, \
    photons_in_range
from . import source_templates as src_tmp

from ..base_classes import SourceBase
from ..utils import (find_file, is_fits, get_fits_type, quantify,
                     quantity_from_table, convert_table_comments_to_dict,
                     close_loop, figure_factory, get_logger)


logger = get_logger(__name__)


[docs]class Source(SourceBase): """ Create a source object from a file or from arrays A Source object must consist of a spatial and a spectral description of the on-sky source. Many sources can be added together and kept in memory as a single Source object. The spatial descriptions are kept in the ``<Source>.fields`` list, while the spectral descriptions are in the ``<Source>.spectra`` list. The spatial description can be built from any combination of: * a list of arrays (like in SimCADO >v0.5) * astropy Table objects * astropy ImageHDU objects * on disk FITS files * on disk ASCII tables The spectral descriptions can be passed as either ``synphot.SourceSpectrum`` objects, or a set of two equal length arrays for wavelength and flux. .. hint:: Initialisation parameter combinations include: New ScopeSim-style input - ``table=<astropy.Table>, spectra=<list of synphot.SourceSpectrum>`` - ``table=<astropy.Table>, lam=<array>, spectra=<list of array>`` - ``image_hdu=<fits.ImageHDU>, spectra=<list of synphot.SourceSpectrum>`` - ``image_hdu=<fits.ImageHDU>, lam=<array>, spectra=<list of array>`` - ``image_hdu=<fits.ImageHDU>, flux=<astropy.Quantity>`` Old SimCADO-style input - ``x=<array>, y=<array>, ref=<array>, spectra=<list of synphot.SourceSpectrum>`` - ``x=<array>, y=<array>, ref=<array>, spectra=<list of array>, lam=<array>`` - ``x=<array>, y=<array>, ref=<array>, weight=<array>, spectra=<list of array>, lam=<array>`` More details on the content of these combinations can be found in the use-case documentation. Parameters ---------- filename : str lam : np.array [um] Wavelength bins of length (m) spectra : list of synphot.SourceSpectra [ph/s/cm2/AA] x, y : np.array [arcsec] coordinates of where the emitting files are relative to the centre of the field of view ref : np.array the index for .spectra which connects a position (x, y) to a spectrum ``flux(x[i], y[i]) = spectra[ref[i]] * weight[i]`` weight : np.array A weighting to scale the relevant spectrum for each position table : astropy.Table image_hdu : fits.ImageHDU [arcsec-2] The .data array is simply a map of weights for the assiciated spectrum referenced by .header["SPEC_REF]. Surface brightness values are assumed to be per arcsec2 flux : astropy.Quantity [u.mag, u.ABmag, u.Jy] Flux values are converted to a reference spectrum that is referenced by image_hdu.header["SPEC_REF"]. flux can only be used in conjuction with image_hdu Attributes ---------- fields : list The spatial distribution of the on-sky source, either as ``fits.ImageHDU`` or ``astropy.Table`` objects spectra : list of ``synphot.SourceSpectrum`` objects List of spectra associated with the fields meta : dict Dictionary of extra information about the source See Also -------- synphot : ``https://synphot.readthedocs.io/en/latest/`` """ def __init__(self, filename=None, cube=None, ext=0, lam=None, spectra=None, x=None, y=None, ref=None, weight=None, table=None, image_hdu=None, flux=None, **kwargs): self.meta = {} self.meta.update(kwargs) # ._meta_dicts contains a meta for each of the .fields. It is primarily # used to set proper FITS header keywords for each field so the source # can be reconstructed from the FITS headers. self._meta_dicts = [self.meta] self.fields = [] self.spectra = [] self.bandpass = None validate_source_input(lam=lam, x=x, y=y, ref=ref, weight=weight, spectra=spectra, table=table, cube=cube, ext=ext, image_hdu=image_hdu, flux=flux, filename=filename) if spectra is not None: spectra = convert_to_list_of_spectra(spectra, lam) if filename is not None and spectra is not None: self._from_file(filename, spectra, flux) elif cube is not None: self._from_cube(cube=cube, ext=ext) elif table is not None and spectra is not None: self._from_table(table, spectra) elif image_hdu is not None and spectra is not None: self._from_imagehdu_and_spectra(image_hdu, spectra) elif image_hdu is not None and flux is not None: self._from_imagehdu_and_flux(image_hdu, flux) elif image_hdu is not None and flux is None and spectra is None: if image_hdu.header.get("BUNIT") is not None: self._from_imagehdu_only(image_hdu) else: msg = ("image_hdu must be accompanied by either spectra or flux:\n" f"spectra: {spectra}, flux: {flux}") logger.exception(msg) raise ValueError(msg) elif x is not None and y is not None and \ ref is not None and spectra is not None: self._from_arrays(x, y, ref, weight, spectra) def _from_file(self, filename, spectra, flux): filename = find_file(filename) if is_fits(filename): fits_type = get_fits_type(filename) data = fits.getdata(filename) hdr = fits.getheader(filename) hdr["FILENAME"] = Path(filename).name if fits_type == "image": image = fits.ImageHDU(data=data, header=hdr) if spectra is not None: self._from_imagehdu_and_spectra(image, spectra) elif flux is not None: self._from_imagehdu_and_flux(image, flux) else: self._from_imagehdu_only(image) elif fits_type == "bintable": hdr1 = fits.getheader(filename, 1) hdr.update(hdr1) tbl = Table(data, meta=dict(hdr)) tbl.meta.update(convert_table_comments_to_dict(tbl)) self._from_table(tbl, spectra) else: tbl = ioascii.read(filename) tbl.meta.update(convert_table_comments_to_dict(tbl)) self._from_table(tbl, spectra) def _from_table(self, tbl, spectra): if "weight" not in tbl.colnames: tbl.add_column(Column(name="weight", data=np.ones(len(tbl)))) tbl["ref"] += len(self.spectra) self.fields.append(tbl) self.spectra += spectra def _from_imagehdu_and_spectra(self, image_hdu, spectra): if not image_hdu.header.get("BG_SRC"): pass # FIXME: This caused more problems than it solved! # Find out if there's a good reason to mess with this, # otherwise just remove... # image_hdu.header["CRVAL1"] = 0 # image_hdu.header["CRVAL2"] = 0 # image_hdu.header["CRPIX1"] = image_hdu.header["NAXIS1"] / 2 # image_hdu.header["CRPIX2"] = image_hdu.header["NAXIS2"] / 2 # #image_hdu.header["CRPIX1"] = (image_hdu.header["NAXIS1"] + 1) / 2 # #image_hdu.header["CRPIX2"] = (image_hdu.header["NAXIS2"] + 1) / 2 # # .. todo:: find where the actual problem is with negative CDELTs # # .. todo:: --> abs(pixel_scale) in header_from_list_of_xy # if image_hdu.header["CDELT1"] < 0: # image_hdu.header["CDELT1"] *= -1 # image_hdu.data = image_hdu.data[:, ::-1] # if image_hdu.header["CDELT2"] < 0: # image_hdu.header["CDELT2"] *= -1 # image_hdu.data = image_hdu.data[::-1, :] if isinstance(image_hdu, fits.PrimaryHDU): image_hdu = fits.ImageHDU(data=image_hdu.data, header=image_hdu.header) if spectra is not None and len(spectra) > 0: image_hdu.header["SPEC_REF"] = len(self.spectra) self.spectra += spectra else: image_hdu.header["SPEC_REF"] = "" logger.warning("No spectrum was provided. SPEC_REF set to ''. " "This could cause problems later") raise NotImplementedError for i in [1, 2]: # Do not test for CUNIT or CDELT so that it throws an exception unit = u.Unit(image_hdu.header["CUNIT"+str(i)].lower()) val = float(image_hdu.header["CDELT"+str(i)]) image_hdu.header["CUNIT"+str(i)] = "deg" image_hdu.header["CDELT"+str(i)] = val * unit.to(u.deg) self.fields.append(image_hdu) def _from_imagehdu_and_flux(self, image_hdu, flux): if isinstance(flux, u.Unit): flux = 1 * flux spec_template = src_tmp.vega_spectrum if isinstance(flux, u.Quantity): if flux.unit.physical_type == "spectral flux density": # ABmag and Jy spec_template = src_tmp.ab_spectrum flux = flux.to(u.ABmag) flux = flux.value spectra = [spec_template(flux)] self._from_imagehdu_and_spectra(image_hdu, spectra) def _from_imagehdu_only(self, image_hdu): bunit = image_hdu.header.get("BUNIT") try: bunit = u.Unit(bunit) except ValueError: logger.error( "Astropy cannot parse BUNIT [%s].\n" "You can bypass this check by passing an astropy Unit to the " "flux parameter:\n" ">>> Source(image_hdu=..., flux=u.Unit(bunit), ...)", bunit) value = 0 if bunit in [u.mag, u.ABmag] else 1 self._from_imagehdu_and_flux(image_hdu, value * bunit) def _from_arrays(self, x, y, ref, weight, spectra): if weight is None: weight = np.ones(len(x)) x = quantify(x, u.arcsec) y = quantify(y, u.arcsec) tbl = Table(names=["x", "y", "ref", "weight"], data=[x, y, np.array(ref) + len(self.spectra), weight]) tbl.meta["x_unit"] = "arcsec" tbl.meta["y_unit"] = "arcsec" self.fields.append(tbl) self.spectra += spectra def _from_cube(self, cube, ext=0): """ Parameters ---------- cube: a file, HDUList or a PrimaryHDU object containing the cube ext: int the extension where the cube is located if applicable. """ if isinstance(cube, fits.HDUList): data = cube[ext].data header = cube[ext].header wcs = WCS(cube[ext], fobj=cube) elif isinstance(cube, (fits.PrimaryHDU, fits.ImageHDU)): data = cube.data header = cube.header wcs = WCS(cube) else: with fits.open(cube) as hdul: data = hdul[ext].data header = hdul[ext].header header["FILENAME"] = Path(cube).name wcs = WCS(cube) try: bunit = header["BUNIT"] u.Unit(bunit) except KeyError: bunit = "erg / (s cm2 arcsec2)" logger.warning( "Keyword \"BUNIT\" not found, setting to %s by default", bunit) except ValueError as error: logger.error("\"BUNIT\" keyword is malformed: %s", error) raise # Compute the wavelength vector. This will be attached to the cube_hdu # as a new `wave` attribute. This is not optimal coding practice. wave = wcs.all_pix2world(header["CRPIX1"], header["CRPIX2"], np.arange(data.shape[0]), 0)[-1] wave = (wave * u.Unit(wcs.wcs.cunit[-1])).to(u.um, equivalencies=u.spectral()) # WCS keywords must be updated because astropy.wcs converts wavelengths to 'm' header.update(wcs.to_header()) target_cube = data target_hdr = header.copy() target_hdr["BUNIT"] = bunit cube_hdu = fits.ImageHDU(data=target_cube, header=target_hdr) cube_hdu.wave = wave # ..todo: review wave attribute, bad practice self.fields.append(cube_hdu) @property def table_fields(self): """List of fields that are defined through tables""" fields = [field for field in self.fields if isinstance(field, Table)] return fields @property def image_fields(self): """List of fields that are defined through two-dimensional images""" fields = [field for field in self.fields if isinstance(field, fits.ImageHDU) and field.header["NAXIS"] == 2] return fields @property def cube_fields(self): """List of fields that are defined through three-dimensional cubes""" fields = [field for field in self.fields if isinstance(field, fits.ImageHDU) and field.header["NAXIS"] == 3] return fields # ..todo: rewrite this method
[docs] def image_in_range(self, wave_min, wave_max, pixel_scale=1*u.arcsec, layers=None, area=None, spline_order=1, sub_pixel=False): if layers is None: layers = range(len(self.fields)) fields = [self.fields[ii] for ii in layers] hdr = imp_utils.get_canvas_header(fields, pixel_scale=pixel_scale) im_plane = ImagePlane(hdr) for field in fields: if isinstance(field, Table): fluxes = self.photons_in_range(wave_min, wave_max, area, field["ref"]) * field["weight"] x = quantity_from_table("x", field, u.arcsec) y = quantity_from_table("y", field, u.arcsec) tbl = Table(names=["x", "y", "flux"], data=[x, y, fluxes]) tbl.meta.update(field.meta) hdu_or_table = tbl elif isinstance(field, fits.ImageHDU): if field.header["SPEC_REF"] != "": ref = [field.header["SPEC_REF"]] flux = self.photons_in_range(wave_min, wave_max, area, ref) # [ph s-1] or [ph s-1 m-2] come out of photons_in_range # ## ..todo: CATCH UNITS HERE. DEAL WITH THEM PROPERLY # Currently assuming that all images are scaled appropriately # and that they have SPEC_REF # else: # field = scale_imagehdu(field, area=area, # solid_angle=pixel_scale**2, # waverange=(wave_min, wave_max)) # # [ph s-1] or [ph s-1 m-2] come out of photons_in_range # flux = 1 image = field.data * flux hdu = fits.ImageHDU(header=field.header, data=image) hdu_or_table = hdu else: continue im_plane.add(hdu_or_table, sub_pixel=sub_pixel, spline_order=spline_order) return im_plane
[docs] def photons_in_range(self, wave_min, wave_max, area=None, indexes=None): """ Parameters ---------- wave_min : float, u.Quantity [um] wave_max : float, u.Quantity [um] area : float, u.Quantity, optional [m2] indexes : list of integers, optional Returns ------- counts : u.Quantity list [ph / s / m2] if area is None [ph / s] if area is passed """ if indexes is None: indexes = range(len(self.spectra)) spectra = [self.spectra[ii] for ii in indexes] counts = photons_in_range(spectra, wave_min, wave_max, area=area, bandpass=self.bandpass) return counts
[docs] def fluxes(self, wave_min, wave_max, **kwargs): return self.photons_in_range(wave_min, wave_max, **kwargs)
[docs] def image(self, wave_min, wave_max, **kwargs): return self.image_in_range(wave_min, wave_max, **kwargs)
[docs] @classmethod def load(cls, filename): """Load :class:'.Source' object from filename""" with open(filename, "rb") as fp1: src = pickle.load(fp1) return src
[docs] def dump(self, filename): """Save to filename as a pickle""" with open(filename, "wb") as fp1: pickle.dump(self, fp1)
# def collapse_spectra(self, wave_min=None, wave_max=None): # for spec in self.spectra: # waves = spec.waveset # if wave_min is not None and wave_max is not None: # mask = (waves >= wave_min) * (waves <= wave_max) # waves = waves[mask] # fluxes = spec(waves) # spec = SourceSpectrum(Empirical1D, points=waves, # lookup_table=fluxes)
[docs] def shift(self, dx=0, dy=0, layers=None): """ Shifts the position of one or more fields w.r.t. the optical axis Parameters ---------- dx, dy : float [arcsec] layers : list of ints which .fields entries to shift """ if layers is None: layers = np.arange(len(self.fields)) for ii in layers: if isinstance(self.fields[ii], Table): x = quantity_from_table("x", self.fields[ii], u.arcsec) x += quantify(dx, u.arcsec) self.fields[ii]["x"] = x y = quantity_from_table("y", self.fields[ii], u.arcsec) y += quantify(dy, u.arcsec) self.fields[ii]["y"] = y elif isinstance(self.fields[ii], (fits.ImageHDU, fits.PrimaryHDU)): dx = quantify(dx, u.arcsec).to(u.deg) dy = quantify(dy, u.arcsec).to(u.deg) self.fields[ii].header["CRVAL1"] += dx.value self.fields[ii].header["CRVAL2"] += dy.value
[docs] def rotate(self, angle, offset=None, layers=None): pass
[docs] def add_bandpass(self, bandpass): if not isinstance(bandpass, SpectralElement): raise ValueError("type(bandpass) must be synphot.SpectralElement") self.bandpass = bandpass
[docs] def plot(self): """ Plot the location of source components Source components instantiated from 2d or 3d ImageHDUs are represented by their spatial footprint. Source components instantiated from tables are shown as points. """ _, axes = figure_factory() colours = "rgbcymk" * (len(self.fields) // 7 + 1) for col, field in zip(colours, self.fields): if isinstance(field, Table): axes.plot(field["x"], field["y"], col+".") elif isinstance(field, (fits.ImageHDU, fits.PrimaryHDU)): xypts = imp_utils.calc_footprint(field.header) convf = u.Unit(field.header["CUNIT1"]).to(u.arcsec) outline = np.array(list(close_loop(xypts))) * convf axes.plot(outline[:, 0], outline[:, 1], col) axes.set_xlabel("x [arcsec]") axes.set_ylabel("y [arcsec]") axes.set_aspect("equal")
[docs] def make_copy(self): new_source = Source() new_source.meta = deepcopy(self.meta) new_source._meta_dicts = deepcopy(self._meta_dicts) new_source.spectra = deepcopy(self.spectra) for field in self.fields: if isinstance(field, (fits.ImageHDU, fits.PrimaryHDU)) \ and field._file is not None: # and field._data_loaded is False: new_source.fields.append(field) else: new_source.fields.append(deepcopy(field)) return new_source
[docs] def append(self, source_to_add): new_source = source_to_add.make_copy() # If there is no field yet, then self._meta_dicts contains a # reference to self.meta, which is empty. This ensures that both are # updated at the same time. However, it is important that the fields # and _meta_dicts match when appending sources. if len(self.fields) == 0: assert self._meta_dicts == [{}] self._meta_dicts = [] if isinstance(source_to_add, Source): for field in new_source.fields: if isinstance(field, Table): field["ref"] += len(self.spectra) self.fields.append(field) elif isinstance(field, (fits.ImageHDU, fits.PrimaryHDU)): if ("SPEC_REF" in field.header and isinstance(field.header["SPEC_REF"], int)): field.header["SPEC_REF"] += len(self.spectra) self.fields.append(field) self.spectra += new_source.spectra self._meta_dicts += source_to_add._meta_dicts else: raise ValueError(f"Cannot add {type(new_source)} object to Source object")
def __add__(self, new_source): self_copy = self.make_copy() self_copy.append(new_source) return self_copy def __radd__(self, new_source): return self.__add__(new_source) def __repr__(self): msg = "" for ifld, fld in enumerate(self.fields): if isinstance(fld, Table): tbl_len = len(fld) num_spec = set(fld["ref"]) msg += f"[{ifld}]: Table with {tbl_len} rows, referencing spectra {num_spec} \n" elif isinstance(fld, (fits.ImageHDU, fits.PrimaryHDU)): im_size = fld.data.shape if fld.data is not None else "<empty>" num_spec = "-" msg += f"[{ifld}]: ImageHDU with size {im_size}" if "SPEC_REF" in self.fields[ifld].header: num_spec = self.fields[ifld].header["SPEC_REF"] msg += f", referencing spectrum {num_spec}" msg += "\n" return msg