# -*- coding: utf-8 -*-
from itertools import product
from collections.abc import Iterable
import numpy as np
from astropy import units as u
from astropy.wcs import WCS
from astropy.io import fits
from astropy.table import Table
from astropy.utils.exceptions import AstropyWarning
from scipy import ndimage as ndi
from ..utils import (unit_from_table, quantity_from_table, has_needed_keywords,
get_logger)
logger = get_logger(__name__)
def _make_bounding_header_from_headers(*headers, pixel_scale=1*u.arcsec):
"""
Return a Header with WCS and NAXISn keywords bounding all input ImageHDUs.
Parameters
----------
headers : list of fits.ImageHDU
pixel_scale : u.Quantity
[arcsec]
Returns
-------
hdr : fits.Header
"""
wcs_suffix = "D" if pixel_scale.unit.physical_type == "length" else ""
unit = u.Unit(_get_unit_from_headers(*headers, wcs_suffix=wcs_suffix))
if unit.physical_type == "angle":
unit = "deg"
pixel_scale = pixel_scale.to(u.deg).value
else:
pixel_scale = pixel_scale.to(unit).value
extents = [calc_footprint(header, wcs_suffix, unit) for header in headers]
pnts = np.vstack(extents)
hdr = header_from_list_of_xy(pnts[:, 0], pnts[:, 1],
pixel_scale, wcs_suffix)
hdr["NAXIS1"] += 1
hdr["NAXIS2"] += 1
hdr[f"CRVAL1{wcs_suffix}"] -= 0.5 * pixel_scale
hdr[f"CRVAL2{wcs_suffix}"] -= 0.5 * pixel_scale
return hdr
def _make_bounding_header_for_tables(*tables, pixel_scale=1*u.arcsec):
"""
Return a Header with WCS and NAXISn keywords bounding all input Tables.
Parameters
----------
tables : list of astropy.Tables
[arcsec] Must contain columns: "x", "y"
pixel_scale : u.Quantity
[arcsec]
Returns
-------
hdr : fits.Header
"""
wcs_suffix = "D" if pixel_scale.unit.physical_type == "length" else ""
# FIXME: Convert to deg here? If yes, remove the arcsec=True below...
# Note: this could all be a lot simpler if we have consistent units, i.e.
# don't need to convert mm -> mm and arcsec -> arcsec (or deg)
new_unit = u.mm if wcs_suffix == "D" else u.arcsec # u.deg
tbl_unit = u.mm if wcs_suffix == "D" else u.arcsec
x_name = "x_mm" if wcs_suffix == "D" else "x"
y_name = "y_mm" if wcs_suffix == "D" else "y"
pixel_scale = pixel_scale.to(new_unit)
extents = []
for table in tables:
extent = calc_table_footprint(table, x_name, y_name,
tbl_unit, new_unit,
padding=pixel_scale)
extents.append(extent)
pnts = np.vstack(extents)
# TODO: check if this could just use create_wcs_from_points
hdr = header_from_list_of_xy(pnts[:, 0], pnts[:, 1], pixel_scale.value,
wcs_suffix, arcsec=wcs_suffix != "D")
return hdr
[docs]def create_wcs_from_points(points: np.ndarray,
pixel_scale: float,
wcs_suffix: str = "") -> tuple[WCS, np.ndarray]:
"""
Create `astropy.wcs.WCS` instance that fits all points inside.
Parameters
----------
corners : (N, 2) array
2D array of N >= 2 points in the form of [x, y].
pixel_scale : float
DESCRIPTION.
wcs_suffix : str, optional
DESCRIPTION. The default is "".
Returns
-------
new_wcs : TYPE
Newly created WCS instance.
naxis : TYPE
Array of NAXIS needed to fit all points.
"""
# TODO: should pixel_scale be called pixel_size by conventions elsewhere?
# TODO: add quantity stuff to docstring
# TODO: find out how this plays with chunks
if wcs_suffix != "D":
points = _fix_360(points)
# TODO: test whether abs(pixel_scale) breaks anything
pixel_scale = abs(pixel_scale)
extent = points.ptp(axis=0) / pixel_scale
naxis = extent.round().astype(int)
# FIXME: Woule be nice to have D headers referenced at (1, 1), but that
# breaks some things, likely down to rescaling...
# if wcs_suffix == "D":
# crpix = np.array([1., 1.])
# crval = points.min(axis=0)
# else:
if isinstance(naxis, u.Quantity):
naxis = naxis.decompose()
if naxis.unit != "pixel":
raise u.UnitConversionError("If given quantities, must resolve to "
f"pix, got '{naxis.unit}' instead.")
naxis = naxis.value
crpix = (naxis + 1) / 2
crval = (points.min(axis=0) + points.max(axis=0)) / 2
ctype = "LINEAR" if wcs_suffix in "DX" else "RA---TAN"
if isinstance(points, u.Quantity):
cunit = points.unit
else:
if wcs_suffix == "D":
cunit = "mm"
else:
cunit = "deg"
if isinstance(pixel_scale, u.Quantity):
pixel_scale = pixel_scale.value
new_wcs = WCS(key=wcs_suffix)
new_wcs.wcs.ctype = 2 * [ctype]
new_wcs.wcs.cunit = 2 * [cunit]
new_wcs.wcs.cdelt = np.array(2 * [pixel_scale])
new_wcs.wcs.crval = crval
new_wcs.wcs.crpix = crpix
return new_wcs, naxis
[docs]def add_table_to_imagehdu(table: Table, canvas_hdu: fits.ImageHDU,
sub_pixel: bool = True,
wcs_suffix: str = "") -> fits.ImageHDU:
"""
Add files from an astropy.Table to the image of an fits.ImageHDU.
Parameters
----------
table : astropy.Table
Must contain the columns "x_mm", "y_mm", "flux" with the units in the
column attribute .unit, or in the table.meta dictionary as
"<colname>_unit". Default units are ``mm`` and ``ph / s / pix``
canvas_hdu : fits.ImageHDU
The ImageHDU onto which the table files should be projected.
This must include a valid WCS
sub_pixel : bool, optional
Default is True. If True, sub-pixel shifts of files will be taken into
account when projecting onto the canvas pixel grid. This takes about 5x
longer than ignoring the sub-pixel shifts
wcs_suffix : str, optional
Returns
-------
canvas_hdu : fits.ImageHDU
"""
s = wcs_suffix
if not has_needed_keywords(canvas_hdu.header, s):
raise ValueError(f"canvas_hdu must include an appropriate WCS: {s}")
f = quantity_from_table("flux", table, default_unit=u.Unit("ph s-1"))
if s == "D":
x = quantity_from_table("x_mm", table, default_unit=u.mm).to(u.mm)
y = quantity_from_table("y_mm", table, default_unit=u.mm).to(u.mm)
else:
arcsec = u.arcsec
canvas_unit = u.Unit(canvas_hdu.header[f"CUNIT1{s}"])
x = quantity_from_table("x", table, default_unit=arcsec.to(canvas_unit))
y = quantity_from_table("y", table, default_unit=arcsec.to(canvas_unit))
x *= arcsec.to(canvas_unit)
y *= arcsec.to(canvas_unit)
xpix, ypix = val2pix(canvas_hdu.header, x.value, y.value, s)
naxis1 = canvas_hdu.header["NAXIS1"]
naxis2 = canvas_hdu.header["NAXIS2"]
# Occasionally 0 is returned as ~ -1e-11
eps = -1e-7
mask = (xpix >= eps) * (xpix < naxis1) * (ypix >= eps) * (ypix < naxis2)
if sub_pixel:
canvas_hdu = _add_subpixel_sources_to_canvas(
canvas_hdu, xpix, ypix, f, mask)
else:
canvas_hdu = _add_intpixel_sources_to_canvas(
canvas_hdu, xpix, ypix, f, mask)
return canvas_hdu
def _add_intpixel_sources_to_canvas(canvas_hdu, xpix, ypix, flux, mask):
canvas_hdu.header["comment"] = f"Adding {len(flux)} int-pixel files"
for xpx, ypx, flx, msk in zip(xpix.astype(int), ypix.astype(int),
flux, mask):
# To prevent adding array values in this manner.
assert not isinstance(xpx, Iterable), "xpx should be integer"
canvas_hdu.data[ypx, xpx] += flx.value * msk
return canvas_hdu
def _add_subpixel_sources_to_canvas(canvas_hdu, xpix, ypix, flux, mask):
canvas_hdu.header["comment"] = f"Adding {len(flux)} sub-pixel files"
canvas_shape = canvas_hdu.data.shape
for xpx, ypx, flx, msk in zip(xpix, ypix, flux, mask):
if msk:
xx, yy, fracs = sub_pixel_fractions(xpx, ypx)
for x, y, frac in zip(xx, yy, fracs):
if y < canvas_shape[0] and x < canvas_shape[1]:
# To prevent adding array values in this manner.
assert not isinstance(x, Iterable), "x should be integer"
canvas_hdu.data[y, x] += frac * flx.value
return canvas_hdu
[docs]def sub_pixel_fractions(x, y):
"""
Make a list of pixel coordinates and weights to reflect sub-pixel shifts.
A point source which isn't centred on a pixel can be modelled by a centred
PSF convolved with a shifted delta function. A fraction of the delta
function in moved into each of the adjoining pixels. For example, a star
at ``(x,y)=(0.2, 0.2)`` would be represented by a following pixel weights::
---------------
| 0.16 | 0.04 |
---------------
| 0.64 | 0.16 |
---------------
where (0,0) is the centre of the bottom-left pixel
Given (x,y) pixel coordinates, this function returns the fractions of flux
that should go into the surrounding pixels, as well as the coordinates of
those neighbouring pixels.
Parameters
----------
x, y : float
Returns
-------
x_pix, y_pix, fracs : list of (int, int, float)
The x and y pixel coordinates and their corresponding flux fraction
"""
x0, dx = divmod(x, 1)
y0, dy = divmod(y, 1)
xi0 = int(x0)
xi1 = xi0 + bool(dx)
yi0 = int(y0)
yi1 = yi0 + bool(dy)
f00 = (1. - dx) * (1. - dy)
f01 = (1. - dx) * dy
f10 = dx * (1 - dy)
f11 = dx * dy
x_pix = [xi0, xi1, xi0, xi1]
y_pix = [yi0, yi0, yi1, yi1]
fracs = [f00, f10, f01, f11]
return x_pix, y_pix, fracs
[docs]def overlay_image(small_im, big_im, coords, mask=None, sub_pixel=False):
"""
Overlay small_im on top of big_im at the position specified by coords.
``small_im`` will be centred at ``coords``
Adapted from:
``https://stackoverflow.com/questions/14063070/overlay-a-smaller-image-on-a-larger-image-python-opencv``
"""
# TODO - Add in a catch for sub-pixel shifts
if sub_pixel:
raise NotImplementedError
# FIXME: this would not be necessary if we used WCS instead of manual 2pix
coords = np.ceil(np.asarray(coords).round(10)).astype(int)
y, x = coords.astype(int)[::-1] - np.array(small_im.shape[-2:]) // 2
# Image ranges
x1, x2 = max(0, x), min(big_im.shape[-1], x + small_im.shape[-1])
y1, y2 = max(0, y), min(big_im.shape[-2], y + small_im.shape[-2])
# Overlay ranges
x1o, x2o = max(0, -x), min(small_im.shape[-1], big_im.shape[-1] - x)
y1o, y2o = max(0, -y), min(small_im.shape[-2], big_im.shape[-2] - y)
# Exit if nothing to do
if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return big_im
if small_im.ndim == 2 and big_im.ndim == 2:
small_im_3 = small_im[None, :, :]
big_im_3 = big_im[None, :, :]
elif small_im.ndim == 3 and big_im.ndim == 3:
small_im_3 = small_im
big_im_3 = big_im
else:
raise ValueError(f"Dimensions mismatch between big_im and small_im: "
f"{big_im.ndim} : {small_im.ndim}")
if mask is None:
big_im_3[:, y1:y2, x1:x2] = big_im_3[:, y1:y2, x1:x2] + \
small_im_3[:, y1o:y2o, x1o:x2o]
else:
mask = mask[None, y1o:y2o, x1o:x2o] * np.ones(small_im_3.shape[-3])
mask = mask.astype(bool)
big_im_3[:, y1:y2, x1:x2][mask] = big_im_3[:, y1:y2, x1:x2][mask] + \
small_im_3[:, y1o:y2o, x1o:x2o][mask]
return big_im
[docs]def rescale_imagehdu(imagehdu: fits.ImageHDU, pixel_scale: float,
wcs_suffix: str = "", conserve_flux: bool = True,
spline_order: int = 1) -> fits.ImageHDU:
"""
Scale the .data array by the ratio of pixel_scale [deg] and CDELTn.
`pixel_scale` should NOT be passed as a Quantity!
Parameters
----------
imagehdu : fits.ImageHDU
pixel_scale : float
[deg] NOT to be passed as a Quantity
wcs_suffix : str
conserve_flux : bool
spline_order : int
[1..5] Order of the spline interpolation used by
``scipy.ndimage.rotate``
Returns
-------
imagehdu : fits.ImageHDU
"""
# WCS needs single space as key for default wcs
wcs_suffix = wcs_suffix or " "
primary_wcs = WCS(imagehdu.header, key=wcs_suffix[0])
# making sure all are positive
zoom = np.abs(primary_wcs.wcs.cdelt / pixel_scale)
if primary_wcs.naxis == 3:
# zoom = np.append(zoom, [1])
zoom[2] = 1.
if primary_wcs.naxis != imagehdu.data.ndim:
logger.warning("imagehdu.data.ndim is %d, but primary_wcs.naxis with "
"key %s is %d, both should be equal.",
imagehdu.data.ndim, wcs_suffix, primary_wcs.naxis)
zoom = zoom[:2]
logger.debug("zoom %s", zoom)
if all(zoom == 1.):
# Nothing to do
return imagehdu
sum_orig = np.sum(imagehdu.data)
# Not sure why the reverse order is necessary here
new_im = ndi.zoom(imagehdu.data, zoom[::-1], order=spline_order)
if conserve_flux:
new_im = np.nan_to_num(new_im, copy=False)
sum_new = np.sum(new_im)
if sum_new != 0:
new_im *= sum_orig / sum_new
imagehdu.data = new_im
for key in wcs_suffix:
# TODO: can this be done with astropy wcs sub-wcs? or wcs.find_all_wcs?
ww = WCS(imagehdu.header, key=key)
if ww.naxis != imagehdu.data.ndim:
logger.warning("imagehdu.data.ndim is %d, but wcs.naxis with key "
"%s is %d, both should be equal.",
imagehdu.data.ndim, key, ww.naxis)
# TODO: could this be ww = ww.sub(2) instead? or .celestial?
ww = WCS(imagehdu.header, key=key, naxis=imagehdu.data.ndim)
if any(ctype != "LINEAR" for ctype in ww.wcs.ctype):
logger.warning("Non-linear WCS rescaled using linear procedure.")
new_crpix = (zoom + 1) / 2 + (ww.wcs.crpix - 1) * zoom
new_crpix = np.round(new_crpix * 2) / 2 # round to nearest half-pixel
logger.debug("new crpix %s", new_crpix)
ww.wcs.crpix = new_crpix
# Keep CDELT3 if cube...
new_cdelt = ww.wcs.cdelt[:]
new_cdelt[0] = pixel_scale
new_cdelt[1] = pixel_scale
ww.wcs.cdelt = new_cdelt
# TODO: is forcing deg here really the best way?
# FIXME: NO THIS WILL MESS UP IF new_cdelt IS IN ARCSEC!!!!!
# new_cunit = [str(cunit) for cunit in ww.wcs.cunit]
# new_cunit[0] = "mm" if key == "D" else "deg"
# new_cunit[1] = "mm" if key == "D" else "deg"
# ww.wcs.cunit = new_cunit
imagehdu.header.update(ww.to_header())
return imagehdu
[docs]def reorient_imagehdu(imagehdu: fits.ImageHDU, wcs_suffix: str = "",
conserve_flux: bool = True,
spline_order: int = 1) -> fits.ImageHDU:
"""
Apply an affine transformation to the image, as given in its header.
Parameters
----------
imagehdu : fits.ImageHDU
wcs_suffix : str
conserve_flux : bool
spline_order : int
[1..5] Order of the spline interpolation used by
``scipy.ndimage.rotate``
Returns
-------
imagehdu : fits.ImageHDU
"""
s = wcs_suffix
hdr = imagehdu.header
pc_keys = ["PC1_1", "PC1_2", "PC2_1", "PC2_2"]
if all(key+s in hdr for key in pc_keys) and imagehdu.data is not None:
xscen, yscen = pix2val(hdr, hdr["NAXIS1"] / 2., hdr["NAXIS2"] / 2., s)
hdr["CRVAL1" + s] = xscen
hdr["CRVAL2" + s] = yscen
mat = np.array([[hdr["PC1_1" + s], hdr["PC1_2" + s], 0],
[hdr["PC2_1" + s], hdr["PC2_2" + s], 0],
[0, 0, 1]])
if imagehdu.data.ndim == 2:
mat = mat[:2, :2]
new_im = affine_map(imagehdu.data, matrix=mat, reshape=True,
spline_order=spline_order)
if conserve_flux:
new_im = np.nan_to_num(new_im, copy=False)
new_im *= np.sum(imagehdu.data) / np.sum(new_im)
imagehdu.data = new_im
hdr[f"CRPIX1{s}"] = hdr["NAXIS1"] / 2.
hdr[f"CRPIX2{s}"] = hdr["NAXIS2"] / 2.
for card in [f"PC1_1{s}", f"PC1_2{s}", f"PC2_1{s}", f"PC2_2{s}"]:
hdr.remove(card)
imagehdu.header = hdr
elif any("PC1_1" in key for key in imagehdu.header):
logger.warning(("PC Keywords were found, but not used due to "
"different wcs_suffix given: %s \n %s"),
wcs_suffix, dict(imagehdu.header))
return imagehdu
[docs]def affine_map(input, matrix=None, rotation_angle: float = 0.,
shear_angle: float = 0., scale_factor=None,
reshape: bool = True, spline_order: int = 3):
"""
Apply an affine transformation matrix to an image around its centre.
Similar functionality to ``scipy.ndimage.rotate`` but for the
``affine_transformation`` function
Either a 2x2 affine transformation matrix can be supplied, or the rotation,
shear, and scaling values which are the basis of an affine transformation.
Parameters
----------
input : array_like
The input array
matrix : ndarray, optional
A 2x2 affine transformation matrix
rotation_angle : float, optional
[deg] If matrix==None, a rotation matrix is built from this angle
shear_angle : float, optional
[deg] If matrix==None, a y-axis shear matrix is built from this angle
scale_factor : list, array
[mx, my] If matrix==None, a scaling matrix is built from this list
reshape : bool, optional
If True, the array is re-sized to contain the whole transformed image
spline_order : int, optional
Default is 3. Spline interpolation order
Returns
-------
output : array-like
The new mapping of the image
"""
if matrix is None:
d2r = np.pi / 180.
c = np.cos(rotation_angle * d2r) if rotation_angle != 0. else 1.
s = np.sin(rotation_angle * d2r) if rotation_angle != 0. else 0.
t = np.tan(shear_angle * d2r) if shear_angle != 0. else 0.
sx = scale_factor[0] if scale_factor is not None else 1.
sy = scale_factor[1] if scale_factor is not None else 1.
mat = (np.array([[c, s], [-s, c]]) @
np.array([[1, t], [0, 1]]) @
np.array([[sx, 0], [0, sy]]))
else:
mat = np.array(matrix)
h, w = np.array(input.shape)[-2:]
if reshape:
# Find the new corner coordinates using the py3.5+ matmul operator
x, y = mat[:2, :2] @ np.array([[0, w, w, 0], [0, 0, h, h]])
out = np.ceil(np.array([y.max() - y.min(),
x.max() - x.min()])).astype(int)
else:
out = np.array([h, w])
c_out = 0.5 * out
c_in = 0.5 * np.array([h, w])
offset = c_in - c_out.dot(mat[:2, :2])
if mat.shape[0] == 3:
offset = np.r_[[0], offset]
out = np.r_[input.shape[0], out]
output = ndi.affine_transform(input, np.rot90(mat, 2),
output_shape=out, offset=offset,
order=spline_order)
return output
[docs]def add_imagehdu_to_imagehdu(image_hdu: fits.ImageHDU,
canvas_hdu: fits.ImageHDU,
spline_order: int = 1,
wcs_suffix: str = "",
conserve_flux: bool = True) -> fits.ImageHDU:
"""
Re-project one ``fits.ImageHDU`` onto another ``fits.ImageHDU``.
..assumption:: of equal grid coordinate lengths
Parameters
----------
image_hdu : fits.ImageHDU
The ``ImageHDU`` which will be reprojected onto `canvas_hdu`
canvas_hdu : fits.ImageHDU
The ``ImageHDU`` onto which the image_hdu should be projected.
This must include a valid WCS
spline_order : int, optional
Default is 1. The order of the spline interpolator used by the
``scipy.ndimage`` functions
wcs_suffix : str or WCS
To determine which WCS to use. "" for sky HDUs and "D" for
ImagePlane HDUs. Can also be ``astropy.wcs.WCS`` object.
conserve_flux : bool
Default is True. Used when zooming and rotating to keep flux constant.
Returns
-------
canvas_hdu : fits.ImageHDU
"""
# TODO: Add a catch for projecting a large image onto a small canvas
# FIXME: This can mutate the input HDUs, investigate this and deepcopy
# if necessary.
# TODO: rename wcs_suffix to wcs, ideally always pass WCS in the future...
if isinstance(wcs_suffix, WCS):
canvas_wcs = wcs_suffix
else:
wcs_suffix = wcs_suffix or " "
canvas_wcs = WCS(canvas_hdu.header, key=wcs_suffix, naxis=2)
if isinstance(image_hdu.data, u.Quantity):
image_hdu.data = image_hdu.data.value
assert canvas_wcs.wcs.cdelt[0] == canvas_wcs.wcs.cdelt[1], \
"canvas must have equal pixel scale on both axes"
canvas_pixel_scale = float(canvas_wcs.wcs.cdelt[0])
conv_fac = u.Unit(image_hdu.header[f"CUNIT1{wcs_suffix}"].lower()).to(canvas_wcs.wcs.cunit[0])
new_hdu = rescale_imagehdu(image_hdu, pixel_scale=canvas_pixel_scale / conv_fac,
wcs_suffix=canvas_wcs.wcs.alt,
spline_order=spline_order,
conserve_flux=conserve_flux)
# TODO: Perhaps add separately formatted WCS logger?
# logger.debug("fromrescale %s", WCS(new_hdu.header, key=canvas_wcs.wcs.alt))
new_hdu = reorient_imagehdu(new_hdu,
wcs_suffix=canvas_wcs.wcs.alt,
spline_order=spline_order,
conserve_flux=conserve_flux)
img_center = np.array([[new_hdu.header["NAXIS1"],
new_hdu.header["NAXIS2"]]])
img_center = (img_center - 1) / 2
new_wcs = WCS(new_hdu.header, key=canvas_wcs.wcs.alt, naxis=2)
sky_center = new_wcs.wcs_pix2world(img_center, 0)
if new_wcs.wcs.cunit[0] == "deg":
sky_center = _fix_360(sky_center)
# logger.debug("canvas %s", canvas_wcs)
# logger.debug("new %s", new_wcs)
logger.debug("sky %s", sky_center)
sky_center *= conv_fac
pix_center = canvas_wcs.wcs_world2pix(sky_center, 0)
logger.debug("pix %s", pix_center)
canvas_hdu.data = overlay_image(new_hdu.data, canvas_hdu.data,
coords=pix_center.squeeze())
return canvas_hdu
[docs]def pix2val(header, x, y, wcs_suffix=""):
"""
Return the real coordinates [deg, mm] for coordinates from a Header WCS.
Parameters
----------
header : fits.Header
x, y : float, list, array
wcs_suffix : str
Returns
-------
a, b : float, array
[deg, mm] Real coordinates as given by the Header WCS
"""
s = wcs_suffix
pckeys = [key + s for key in ["PC1_1", "PC1_2", "PC2_1", "PC2_2"]]
if all(key in header for key in pckeys):
pc11, pc12, pc21, pc22 = (header[key] for key in pckeys)
else:
pc11, pc12, pc21, pc22 = 1, 0, 0, 1
if (pc11 * pc22 - pc12 * pc21) != 1.0:
logger.error("PC matrix det != 1.0")
da = header["CDELT1"+s]
db = header["CDELT2"+s]
x0 = header["CRPIX1"+s] - 1
y0 = header["CRPIX2"+s] - 1
a0 = header["CRVAL1"+s]
b0 = header["CRVAL2"+s]
a = a0 + da * ((x - x0) * pc11 + (y - y0) * pc12)
b = b0 + db * ((x - x0) * pc21 + (y - y0) * pc22)
return a, b
[docs]def val2pix(header, a, b, wcs_suffix=""):
"""
Return the pixel coordinates for real coordinates [deg, mm] from a WCS.
Parameters
----------
header : fits.Header
a, b : float, list, array
[deg, mm]
Returns
-------
x, y : float, array
[pixel] Pixel coordinates as given by the Header WCS
"""
s = wcs_suffix
if isinstance(header, fits.ImageHDU):
header = header.header
pckeys = [key + s for key in ["PC1_1", "PC1_2", "PC2_1", "PC2_2"]]
if all(key in header for key in pckeys):
pc11, pc12, pc21, pc22 = (header[key] for key in pckeys)
else:
pc11, pc12, pc21, pc22 = 1, 0, 0, 1
if (pc11 * pc22 - pc12 * pc21) != 1.0:
logger.error("PC matrix det != 1.0")
da = float(header["CDELT1"+s])
db = float(header["CDELT2"+s])
x0 = float(header["CRPIX1"+s]) - 1
y0 = float(header["CRPIX2"+s]) - 1
a0 = float(header["CRVAL1"+s])
b0 = float(header["CRVAL2"+s])
x = x0 + 1. / da * ((a - a0) * pc11 - (b - b0) * pc21)
y = y0 + 1. / db * ((a - a0) * pc12 + (b - b0) * pc22)
return x, y
def _fix_360(arr):
"""Fix the "full circle overflow" that occurs with deg."""
if isinstance(arr, u.Quantity):
arr[arr > 270*u.deg] -= 360*u.deg
arr[arr <= -90*u.deg] += 360*u.deg
else:
arr = np.asarray(arr)
arr[arr > 270] -= 360
arr[arr <= -90] += 360
return arr
def _get_unit_from_headers(*headers, wcs_suffix: str = "") -> str:
unit = headers[0][f"CUNIT1{wcs_suffix}"].lower()
assert all(header[f"CUNIT{i}{wcs_suffix}"].lower() == unit
for header, i in product(headers, range(1, 3))), \
[(i, header[f"CUNIT{i}{wcs_suffix}"])
for header, i in product(headers, range(1, 3))]
return unit
[docs]def det_wcs_from_sky_wcs(sky_wcs: WCS,
pixel_scale: float,
plate_scale: float,
naxis=None) -> tuple[WCS, np.ndarray]:
"""
Create detector WCS from celestial WCS using pixel and plate scales.
Parameters
----------
sky_wcs : astropy.wcs.WCS
Celestial WCS.
pixel_scale : float
Quantity or float (assumed to be arcsec / pixel).
plate_scale : float
Quantity or float (assumed to be arcsec / mm).
naxis : (int, int), optional
Shape of the image, usually ``NAXIS1`` and ``NAXIS2``. If the input WCS
holds this information, the default None will use that. Otherwise not
providing `naxis` will raise and error.
Returns
-------
det_wcs : astropy.wcs.WCS
Detector WCS.
det_naxis : (int, int)
Shape of the image (``NAXIS1``, ``NAXIS2``).
"""
# TODO: Using astropy units for now to avoid deg vs. arcsec confusion.
# Once Scopesim is consistent there, remove astropy units.
pixel_scale <<= u.arcsec / u.pixel
plate_scale <<= u.arcsec / u.mm
logger.debug("Pixel scale: %s", pixel_scale)
logger.debug("Plate scale: %s", plate_scale)
pixel_size = pixel_scale / plate_scale
# TODO: add check if cunit is consistent along all axes
cunit = sky_wcs.wcs.cunit[0]
corners = sky_wcs.calc_footprint(center=False, axes=naxis) * cunit
logger.debug("WCS sky corners:\n%s", corners)
corners /= plate_scale
corners = corners.to(u.mm)
logger.debug("WCS det corners:\n%s", corners)
return create_wcs_from_points(corners, pixel_size, "D")
[docs]def sky_wcs_from_det_wcs(det_wcs: WCS,
pixel_scale: float,
plate_scale: float,
naxis=None) -> tuple[WCS, np.ndarray]:
"""
Create celestial WCS from detector WCS using pixel and plate scales.
Parameters
----------
det_wcs : astropy.wcs.WCS
Detector WCS.
pixel_scale : float
Quantity or float (assumed to be arcsec / pixel).
plate_scale : float
Quantity or float (assumed to be arcsec / mm).
naxis : (int, int), optional
Shape of the image, usually ``NAXIS1`` and ``NAXIS2``. If the input WCS
holds this information, the default None will use that. Otherwise not
providing `naxis` will raise and error.
Returns
-------
sky_wcs : astropy.wcs.WCS
Celestial WCS.
sky_naxis : (int, int)
Shape of the image (``NAXIS1``, ``NAXIS2``).
"""
# TODO: Using astropy units for now to avoid deg vs. arcsec confusion.
# Once Scopesim is consistent there, remove astropy units.
pixel_scale <<= u.arcsec / u.pixel
plate_scale <<= u.arcsec / u.mm
logger.debug("Pixel scale: %s", pixel_scale)
logger.debug("Plate scale: %s", plate_scale)
# TODO: add check if cunit is consistent along all axes
cunit = det_wcs.wcs.cunit[0]
corners = det_wcs.calc_footprint(center=False, axes=naxis) * cunit
logger.debug("WCS det corners: %s", corners)
corners *= plate_scale
corners = corners.to(u.arcsec)
logger.debug("WCS sky corners: %s", corners)
return create_wcs_from_points(corners, pixel_scale)