Source code for scopesim.optics.fov_utils
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.table import Table, Column
from astropy.wcs import WCS
from synphot import SourceSpectrum, Empirical1D
from . import image_plane_utils as imp_utils
from ..utils import from_currsys, quantify, quantity_from_table, get_logger
logger = get_logger(__name__)
[docs]def is_field_in_fov(fov_header, field, wcs_suffix=""):
"""
Return True if Source.field footprint is inside the FieldOfView footprint.
Parameters
----------
fov_header : fits.Header
Header from a FieldOfView object
field : [astropy.Table, astropy.ImageHDU]
Field object from a Source object
wcs_suffix : str
["S", "D"] Coordinate system: Sky or Detector
Returns
-------
is_inside_fov : bool
"""
if isinstance(field, fits.ImageHDU) and \
field.header.get("BG_SRC") is not None:
is_inside_fov = True
else:
if isinstance(field, Table):
x = list(quantity_from_table("x", field,
u.arcsec).to(u.deg).value)
y = list(quantity_from_table("y", field,
u.arcsec).to(u.deg).value)
s = wcs_suffix
# cdelt = quantify(fov_header["CDELT1" + s], u.deg).value
cdelt = fov_header[f"CDELT1{s}"] * u.Unit(fov_header[f"CUNIT1{s}"]).to(u.deg)
field_header = imp_utils.header_from_list_of_xy(x, y, cdelt, s)
elif isinstance(field, (fits.ImageHDU, fits.PrimaryHDU)):
field_header = field.header
else:
logger.warning("Input was neither Table nor ImageHDU: %s", field)
return False
xy = imp_utils.calc_footprint(field_header, wcs_suffix)
ext_xsky, ext_ysky = xy[:, 0], xy[:, 1]
xy = imp_utils.calc_footprint(fov_header, wcs_suffix)
fov_xsky, fov_ysky = xy[:, 0], xy[:, 1]
fov_xsky *= u.Unit(fov_header["CUNIT1"].lower()).to(u.deg)
fov_ysky *= u.Unit(fov_header["CUNIT2"].lower()).to(u.deg)
is_inside_fov = (min(ext_xsky) < max(fov_xsky) and
max(ext_xsky) > min(fov_xsky) and
min(ext_ysky) < max(fov_ysky) and
max(ext_ysky) > min(fov_ysky))
return is_inside_fov
[docs]def make_flux_table(source_tbl, src, wave_min, wave_max, area):
fluxes = np.zeros(len(src.spectra))
ref_set = list(set(source_tbl["ref"]))
flux_set = src.photons_in_range(wave_min, wave_max, area, ref_set)
fluxes[ref_set] = flux_set
ref = source_tbl["ref"]
weight = source_tbl["weight"]
flux_col = Column(name="flux", data=fluxes[ref] * weight)
x_col = source_tbl["x"]
y_col = source_tbl["y"]
tbl = Table()
tbl.add_columns([x_col, y_col, flux_col])
return tbl
[docs]def combine_table_fields(fov_header, src, field_indexes):
"""
Combine list of Table objects into a single one bounded by the Header WCS.
Parameters
----------
fov_header : fits.Header
Header from a FieldOfView objects
src : Source object
field_indexes : list of int
Returns
-------
tbl : Table
"""
xy = imp_utils.calc_footprint(fov_header)
fov_xsky, fov_ysky = xy[:, 0], xy[:, 1]
x, y, ref, weight = [], [], [], []
for ii in field_indexes:
field = src.fields[ii]
if isinstance(field, Table):
xcol = quantity_from_table("x", field, u.arcsec)
ycol = quantity_from_table("y", field, u.arcsec)
x += list(xcol.to(u.deg).value)
y += list(ycol.to(u.deg).value)
ref += list(field["ref"])
weight += list(field["weight"])
x = np.array(x)
y = np.array(y)
mask = (np.array(x < max(fov_xsky)) * np.array(x > min(fov_xsky)) *
np.array(y < max(fov_ysky)) * np.array(y > min(fov_ysky)))
x = x[mask]
y = y[mask]
ref = np.array(ref)[mask]
weight = np.array(weight)[mask]
tbl = Table(names=["x", "y", "ref", "weight"], data=[x, y, ref, weight])
tbl["x"].unit = u.deg
tbl["y"].unit = u.deg
return tbl
[docs]def combine_imagehdu_fields(fov_header, src, fields_indexes, wave_min, wave_max,
area, wcs_suffix=""):
"""
Combine list of ImageHDUs into a single one bounded by the Header WCS.
Parameters
----------
fov_header : fits.Header
Header from the FieldOfView
src : Source object
fields_indexes : list of ints
Which indexes from <Source>.fields to use
wave_min : float
[deg] Blue spectral border
wave_max : float
[deg] Red spectral border
area : float
[m2] Area of the primary aperture
wcs_suffix : str
Which coordinate system to use
- "" for the on-sky coordinate system
- "D" for the image-plane coordinate system
Returns
-------
canvas_hdu : fits.ImageHDU
"""
image = np.zeros((fov_header["NAXIS2"], fov_header["NAXIS1"]))
canvas_hdu = fits.ImageHDU(header=fov_header, data=image)
spline_order = from_currsys("!SIM.computing.spline_order")
pixel_area = (fov_header["CDELT1"] * fov_header["CDELT2"] *
u.Unit(fov_header["CUNIT1"].lower()).to(u.arcsec) ** 2)
for ii in fields_indexes:
field = src.fields[ii]
if isinstance(field, fits.ImageHDU):
ref = field.header["SPEC_REF"]
flux = src.photons_in_range(wave_min, wave_max, area,
indexes=[ref])
image = np.zeros((fov_header["NAXIS2"], fov_header["NAXIS1"]))
temp_hdu = fits.ImageHDU(header=fov_header, data=image)
if field.header.get("BG_SRC", False) and \
field.header["NAXIS1"] <= 1 and \
field.header["NAXIS2"] <= 1:
# .. todo: check if we need to take pixel_scale into account
temp_hdu.data += flux[0].value * pixel_area
else:
temp_hdu = imp_utils.add_imagehdu_to_imagehdu(
field, temp_hdu, spline_order, wcs_suffix)
temp_hdu.data *= flux[0].value
canvas_hdu.data += temp_hdu.data
return canvas_hdu
[docs]def sky2fp(header, xsky, ysky):
"""
Convert sky coordinates to image plane coordinated.
Parameters
----------
header : Header
Header of a FieldOfView object which contains two sets of WCS keywords
xsky, ysky : float, array
[deg] The on-sky coordinated
Returns
-------
xdet, ydet : float, array
[mm] The coordinated on the image plane
"""
xpix, ypix = imp_utils.val2pix(header, xsky, ysky)
xdet, ydet = imp_utils.pix2val(header, xpix, ypix, "D")
return xdet, ydet
[docs]def extract_common_field(field, fov_volume):
"""
Extract the overlapping parts of a field within a FOV volume.
Parameters
----------
field : Table or ImageHDU
fov_volume : dict
Contains {"xs": [xmin, xmax], "ys": [ymin, ymax],
"waves": [wave_min, wave_max],
"xy_unit": "deg" or "mm", "wave_unit": "um"}
Returns
-------
field_new : Table or ImageHDU
"""
if isinstance(field, Table):
field_new = extract_area_from_table(field, fov_volume)
elif isinstance(field, fits.ImageHDU):
field_new = extract_area_from_imagehdu(field, fov_volume)
else:
raise ValueError("field must be either Table or ImageHDU, but is "
f"{type(field)}")
return field_new
[docs]def extract_area_from_table(table, fov_volume):
"""
Extract the entries of a ``Table`` that fits inside the `fov_volume`.
Parameters
----------
table : fits.ImageHDU
The field ImageHDU, either an image of a wavelength [um] cube
fov_volume : dict
Contains {"xs": [xmin, xmax], "ys": [ymin, ymax],
"waves": [wave_min, wave_max],
"xy_unit": "deg" or "mm", "wave_unit": "um"}
Returns
-------
new_imagehdu : fits.ImageHDU
"""
fov_unit = u.Unit(fov_volume["xy_unit"])
fov_xs = (fov_volume["xs"] * fov_unit).to(table["x"].unit)
fov_ys = (fov_volume["ys"] * fov_unit).to(table["y"].unit)
mask = ((table["x"].data >= fov_xs[0].value) *
(table["x"].data < fov_xs[1].value) *
(table["y"].data >= fov_ys[0].value) *
(table["y"].data < fov_ys[1].value))
table_new = table[mask]
return table_new
[docs]def extract_area_from_imagehdu(imagehdu, fov_volume):
"""
Extract the part of a ``ImageHDU`` that fits inside the `fov_volume`.
Parameters
----------
imagehdu : fits.ImageHDU
The field ImageHDU, either an image or a cube with wavelength [um]
fov_volume : dict
Contains {"xs": [xmin, xmax], "ys": [ymin, ymax],
"waves": [wave_min, wave_max],
"xy_unit": "deg" or "mm", "wave_unit": "um"}
Returns
-------
new_imagehdu : fits.ImageHDU
"""
hdr = imagehdu.header
image_wcs = WCS(hdr, naxis=2)
naxis1, naxis2 = hdr["NAXIS1"], hdr["NAXIS2"]
xy_hdu = image_wcs.calc_footprint(center=False, axes=(naxis1, naxis2))
if image_wcs.wcs.cunit[0] == "deg":
logger.debug("Found 'deg' in image WCS, applying 360 fix.")
imp_utils._fix_360(xy_hdu)
elif image_wcs.wcs.cunit[0] == "arcsec":
logger.debug("Found 'arcsec' in image WCS, converting to deg.")
xy_hdu *= u.arcsec.to(u.deg)
logger.debug("XY HDU:\n%s", xy_hdu)
xy_fov = np.array([fov_volume["xs"], fov_volume["ys"]]).T
if fov_volume["xy_unit"] == "arcsec":
xy_fov *= u.arcsec.to(u.deg)
logger.debug("XY FOV:\n%s", xy_fov)
xy0s = np.array((xy_hdu.min(axis=0), xy_fov.min(axis=0))).max(axis=0)
xy1s = np.array((xy_hdu.max(axis=0), xy_fov.max(axis=0))).min(axis=0)
# Round to avoid floating point madness
xyp = image_wcs.wcs_world2pix(np.array([xy0s, xy1s]), 0).round(7)
# To deal with negative CDELTs
logger.debug("xyp:\n%s", xyp)
xyp.sort(axis=0)
logger.debug("xyp:\n%s", xyp)
xy0p = np.max(((0, 0), np.floor(xyp[0]).astype(int)), axis=0)
xy1p = np.min(((naxis1, naxis2), np.ceil(xyp[1]).astype(int)), axis=0)
# Add 1 if the same
xy1p += (xy0p == xy1p)
logger.debug("xy0p: %s; xy1p: %s", xy0p, xy1p)
new_wcs, new_naxis = imp_utils.create_wcs_from_points(
np.array([xy0s, xy1s]), pixel_scale=hdr["CDELT1"])
new_hdr = new_wcs.to_header()
new_hdr.update({"NAXIS1": new_naxis[0], "NAXIS2": new_naxis[1]})
if hdr["NAXIS"] == 3:
# Look 0.5*wdel past the fov edges in each direction to catch any
# slices where the middle wavelength value doesn't fall inside the
# fov waverange, but up to 50% of the slice is actually inside the
# fov waverange:
# E.g. FOV: [1.92, 2.095], HDU bin centres: [1.9, 2.0, 2.1]
# CDELT3 = 0.1, and HDU bin edges: [1.85, 1.95, 2.05, 2.15]
# So 1.9 slice needs to be multiplied by 0.3, and 2.1 slice should be
# multipled by 0.45 to reach the scaled contribution of the edge slices
# This scaling factor is:
# f = ((hdu_bin_centre - fov_edge [+/-] 0.5 * cdelt3) % cdelt3) / cdelt3
hdu_waves = get_cube_waveset(hdr)
wdel = hdr["CDELT3"]
wunit = u.Unit(hdr.get("CUNIT3", "AA"))
fov_waves = quantify(fov_volume["waves"], u.um).to(wunit).value
mask = ((hdu_waves > fov_waves[0] - 0.5 * wdel) *
(hdu_waves <= fov_waves[1] + 0.5 * wdel)) # need to go [+/-] half a bin
# OC [2021-12-14] if fov range is not covered by the source return nothing
if not np.any(mask):
logger.warning("FOV %s um - %s um: not covered by Source",
fov_waves[0], fov_waves[1])
# FIXME: returning None here breaks the principle that a function
# should always return the same type. Maybe this should
# instead raise an exception that's caught higher up...
return None
i0p, i1p = np.where(mask)[0][0], np.where(mask)[0][-1]
f0 = (abs(hdu_waves[i0p] - fov_waves[0] + 0.5 * wdel) % wdel) / wdel # blue edge
f1 = (abs(hdu_waves[i1p] - fov_waves[1] - 0.5 * wdel) % wdel) / wdel # red edge
data = imagehdu.data[i0p:i1p+1,
xy0p[1]:xy1p[1],
xy0p[0]:xy1p[0]]
data[0, :, :] *= f0
if i1p > i0p:
data[-1, :, :] *= f1
# w0, w1 : the closest cube wavelengths outside the fov edge wavelengths
# fov_waves : the fov edge wavelengths
# f0, f1 : the scaling factors for the blue and red edge cube slices
#
# w0, w1 = hdu_waves[i0p], hdu_waves[i1p]
new_hdr.update({"NAXIS": 3,
"NAXIS3": data.shape[0],
"CRVAL3": hdu_waves[i0p],
"CRPIX3": 0,
"CDELT3": hdr["CDELT3"],
"CUNIT3": hdr["CUNIT3"],
"CTYPE3": hdr["CTYPE3"],
"BUNIT": hdr["BUNIT"]})
else:
data = imagehdu.data[xy0p[1]:xy1p[1],
xy0p[0]:xy1p[0]]
new_hdr["SPEC_REF"] = hdr.get("SPEC_REF")
if not data.size:
logger.warning("Empty image HDU.")
new_imagehdu = fits.ImageHDU(data=data)
new_imagehdu.header.update(new_hdr)
return new_imagehdu
[docs]def get_cube_waveset(hdr, return_quantity=False):
wval, wdel, wpix, wlen, = [hdr[kw] for kw in ["CRVAL3", "CDELT3",
"CRPIX3", "NAXIS3"]]
# ASSUMPTION - cube wavelength is in regularly spaced units of um
wmin = wval - wdel * wpix
wmax = wmin + wdel * (wlen - 1)
wunit = u.Unit(hdr.get("CUNIT3", "AA"))
if "LOG" in hdr.get("CTYPE3", ""):
hdu_waves = np.logspace(wmin, wmax, wlen)
else:
hdu_waves = np.linspace(wmin, wmax, wlen)
if return_quantity:
hdu_waves = hdu_waves << wunit
hdu_waves.to(u.um)
return hdu_waves
[docs]def extract_range_from_spectrum(spectrum, waverange):
if not isinstance(spectrum, SourceSpectrum):
raise ValueError(f"spectrum must be of type synphot.SourceSpectrum: "
f"{type(spectrum)}")
wave_min, wave_max = quantify(waverange, u.um).to(u.AA).value
spec_waveset = spectrum.waveset.to(u.AA).value
mask = (spec_waveset > wave_min) * (spec_waveset < wave_max)
# if sum(mask) == 0:
# logger.info(
# "Waverange does not overlap with Spectrum waveset: %s <> %s for "
# "spectrum %s", [wave_min, wave_max], spec_waveset, spectrum)
if wave_min < min(spec_waveset) or wave_max > max(spec_waveset):
logger.info(("Waverange only partially overlaps with Spectrum waveset: "
"%s <> %s for spectrum %s"),
[wave_min, wave_max], spec_waveset, spectrum)
wave = np.r_[wave_min, spec_waveset[mask], wave_max]
flux = spectrum(wave)
new_spectrum = SourceSpectrum(Empirical1D, points=wave, lookup_table=flux)
new_spectrum.meta.update(spectrum.meta)
return new_spectrum
[docs]def make_cube_from_table(table, spectra, waveset, fov_header, sub_pixel=False):
"""
Parameters
----------
table: astropy.Table
spectra: dict
waveset: np.ndarray
fov_header: fits.Header
sub_pixel: bool, optional
Returns
-------
cube: fits.ImageHDU
Units of ph/s/m2/bin --> should this be ph / (s * m2 * um)?
"""
cube = np.zeros((fov_header["NAXIS2"], fov_header["NAXIS1"], len(waveset)))
dwave = 0.5 * (np.r_[np.diff(waveset), 0] + np.r_[0, np.diff(waveset)])
# ..todo: dwave is questionable here. What should the FOV cube units be?
spec_dict = {i: spec(waveset) * dwave for i, spec in spectra.items()}
cdelt1, cdelt2 = fov_header["CDELT1"], fov_header["CDELT2"]
crval1, crval2 = fov_header["CRVAL1"], fov_header["CRVAL2"]
crpix1, crpix2 = fov_header["CRPIX1"], fov_header["CRPIX2"]
cunit1, cunit2 = fov_header["CUNIT1"], fov_header["CUNIT2"]
xps = (table["x"].to(cunit1.lower()).value - crval1) / cdelt1 + crpix1
yps = (table["y"].to(cunit2.lower()).value - crval2) / cdelt2 + crpix2
refs, weights = table["ref"], table["weight"]
for xp, yp, ref, weight in zip(xps, yps, refs, weights):
cube[int(round(yp)), int(round(xp)), :] += weight * spec_dict[ref].value
cube = np.rollaxis(cube, 2)
cdelt3 = np.diff(waveset[:2]).to(u.um)[0]
hdu = fits.ImageHDU(data=cube)
hdu.header.update(fov_header)
hdu.header.update({"CRVAL3": waveset[0].value,
"CRPIX3": 0,
"CDELT3": cdelt3.value,
"CUNIT3": str(cdelt3.unit),
"CTYPE3": "WAVE"})
return hdu