"""TBA."""
from pathlib import Path
import numpy as np
from astropy import units as u
from astropy.table import Table
from astropy.utils.data import download_file
from astropy.io import ascii as ioascii
from astropy.wcs import WCS
from synphot import SpectralElement, SourceSpectrum, Empirical1D, Observation
from synphot.units import PHOTLAM
from scopesim.source.source_templates import vega_spectrum, st_spectrum, \
ab_spectrum
from ..utils import find_file, quantity_from_table, from_currsys, get_logger
logger = get_logger(__name__)
FILTER_DEFAULTS = {"U": "Generic/Bessell.U",
"B": "Generic/Bessell.B",
"V": "Generic/Bessell.V",
"R": "Generic/Bessell.R",
"I": "Generic/Bessell.I",
"J": "2MASS/2MASS.J",
"H": "2MASS/2MASS.H",
"Ks": "2MASS/2MASS.Ks",
"K": "Generic/Johnson_UBVRIJHKL.K",
"L": "Gemini/NIRI.Lprime-G0207w",
"M": "Gemini/NIRI.Mprime-G0208w",
"N": "Generic/Johnson_UBVRIJHKL.N",
"u": "SLOAN/SDSS.u",
"g": "SLOAN/SDSS.g",
"r": "SLOAN/SDSS.r",
"i": "SLOAN/SDSS.i",
"z": "SLOAN/SDSS.z",
"u'": "SLOAN/SDSS.uprime_filter",
"g'": "SLOAN/SDSS.gprime_filter",
"r'": "SLOAN/SDSS.rprime_filter",
"i'": "SLOAN/SDSS.iprime_filter",
"z'": "SLOAN/SDSS.zprime_filter",
"HAlpha": "Gemini/GMOS-N.Ha",
"PaBeta": "Gemini/NIRI.PaBeta-G0221",
"BrGamma": "Gemini/NIRI.BrG-G0218",
}
PATH_HERE = Path(__file__).parent
PATH_SVO_DATA = PATH_HERE.parent / "data" / "svo"
[docs]def get_filter_effective_wavelength(filter_name):
if isinstance(filter_name, str):
filter_name = from_currsys(filter_name)
wave, trans = download_svo_filter(FILTER_DEFAULTS[filter_name],
return_style="quantity")
eff_wave = np.sum(wave * trans) / np.sum(trans) # convert from Angstrom
eff_wave = eff_wave.to(u.um)
else:
eff_wave = filter_name
return eff_wave
[docs]def download_svo_filter(filter_name, return_style="synphot"):
"""
Query the SVO service for the true transmittance for a given filter.
Copied 1 to 1 from tynt by Brett Morris
Parameters
----------
filter_name : str
Name of the filter as available on the spanish VO filter service
e.g: ``Paranal/HAWKI.Ks``
return_style : str, optional
Defines the format the data is returned
- synphot: synphot.SpectralElement
- table: astropy.table.Table
- quantity: astropy.unit.Quantity [wave, trans]
- array: np.ndarray [wave, trans], where wave is in Angstrom
- vo_table : astropy.table.Table - original output from SVO service
Returns
-------
filt_curve : See return_style
Astronomical filter object.
"""
# The SVO is only accessible over http, not over https.
# noinspection HttpUrlsUsage
url = f"http://svo2.cab.inta-csic.es/theory/fps3/fps.php?ID={filter_name}"
path = find_file(
filter_name,
path=[PATH_SVO_DATA],
silent=True,
)
if not path:
logger.debug("File not found in %s, downloading...", PATH_SVO_DATA)
path = download_file(url, cache=True)
try:
tbl = Table.read(path, format='votable')
except ValueError:
logger.error("Unable to load %s from %s.", filter_name, path)
raise
wave = u.Quantity(tbl['Wavelength'].data.data, u.Angstrom, copy=False)
trans = tbl['Transmission'].data.data
if return_style == "synphot":
filt = SpectralElement(Empirical1D, points=wave, lookup_table=trans)
elif return_style == "table":
filt = Table(data=[wave, trans], names=["wavelength", "transmission"])
filt.meta["wavelength_unit"] = "Angstrom"
elif return_style == "quantity":
filt = [wave, trans]
elif return_style == "array":
filt = [wave.value, trans]
elif return_style == "vo_table":
filt = tbl
else:
raise ValueError("return_style %s unknown.", return_style)
return filt
[docs]def download_svo_filter_list(observatory, instrument, short_names=False,
include=None, exclude=None):
"""
Query the SVO service for a list of filter names for an instrument.
Parameters
----------
observatory : str
Name of the observatory as available on the spanish VO filter service
e.g: ``Paranal/HAWKI.Ks`` --> Paranal
instrument : str
Name of the instrument. Be careful of hyphens etc. E.g. "HAWK-I"
short_names : bool
Default False. If True, the full SVO names (obs/inst.filt) are split to
only return the (filt) part of the name
include, exclude: str
Each a string sequence for excluding or including specific filters
E.g. GTC/OSIRIS has curves for ``sdss_g`` and ``sdss_g_filter``.
We can force the inclusion of only the filter curves by setting
``include="_filter"``.
Returns
-------
names : list
A list of filter names
"""
# The SVO is only accessible over http, not over https.
# noinspection HttpUrlsUsage
base_url = "http://svo2.cab.inta-csic.es/theory/fps3/fps.php?"
url = base_url + f"Facility={observatory}&Instrument={instrument}"
fn = f"{observatory}/{instrument}"
path = find_file(
fn,
path=[PATH_SVO_DATA],
silent=True,
)
if not path:
path = download_file(url, cache=True)
tbl = Table.read(path, format='votable')
names = list(tbl["filterID"])
if short_names:
names = [name.split(".")[-1] for name in names]
if include is not None:
names = [name for name in names if include in name]
if exclude is not None:
names = [name for name in names if exclude not in name]
return names
[docs]def get_filter(filter_name):
# first check locally
# check generics
# check spanish_vo
path = find_file(filter_name, silent=True)
if path is not None:
tbl = ioascii.read(path)
wave = quantity_from_table("wavelength", tbl, u.um).to(u.um)
filt = SpectralElement(Empirical1D, points=wave,
lookup_table=tbl["transmission"])
elif filter_name in FILTER_DEFAULTS:
filt = download_svo_filter(FILTER_DEFAULTS[filter_name])
else:
try:
filt = download_svo_filter(filter_name)
except ConnectionError:
filt = None
return filt
[docs]def get_zero_mag_spectrum(system_name="AB"):
if system_name.lower() in ["vega"]:
spec = vega_spectrum()
elif system_name.lower() in ["ab"]:
spec = ab_spectrum()
elif system_name.lower() in ["st", "hst"]:
spec = st_spectrum()
else:
raise ValueError("system_name %s is unknown", system_name)
return spec
[docs]def zero_mag_flux(filter_name, photometric_system, return_filter=False):
"""
Return the zero magnitude photon flux for a filter.
Acceptable filter names are those given in
``scopesim.effects.ter_curves_utils.FILTER_DEFAULTS`` or a string with an
appropriate name for a filter in the Spanish-VO filter-service. Such
strings must use the naming convention: observatory/instrument.filter. E.g:
``paranal/HAWKI.Ks``, or ``Gemini/GMOS-N.CaT``.
Parameters
----------
filter_name : str
Name of the filter - see above
photometric_system : str
["vega", "AB", "ST"] Name of the photometric system
return_filter : bool, optional
If True, also returns the filter curve object
Returns
-------
flux : float
[PHOTLAM]
filt : ``synphot.SpectralElement``
If ``return_filter`` is True
"""
filt = get_filter(filter_name)
spec = get_zero_mag_spectrum(photometric_system)
obs = Observation(spec, filt)
flux = obs.effstim(flux_unit=PHOTLAM)
if return_filter:
return flux, filt
else:
return flux
[docs]def scale_spectrum(spectrum, filter_name, amplitude):
"""
Scale a SourceSpectrum to a value in a filter.
Parameters
----------
spectrum : synphot.SourceSpectrum
filter_name : str
Name of a filter from
- a local instrument package (available in ``rc.__search_path__``)
- a generic filter name (see ``ter_curves_utils.FILTER_DEFAULTS``)
- a spanish-vo filter service reference (e.g. ``"Paranal/HAWKI.Ks"``)
amplitude : ``astropy.Quantity``, float
The value that the spectrum should have in the given filter. Acceptable
astropy quantities are:
- u.mag : Vega magnitudes
- u.ABmag : AB magnitudes
- u.STmag : HST magnitudes
- u.Jy : Jansky per filter bandpass
Additionally the ``FLAM`` and ``FNU`` units from ``synphot.units`` can
be used when passing the quantity for ``amplitude``:
Returns
-------
spectrum : synphot.SourceSpectrum
Input spectrum scaled to the given amplitude in the given filter
Examples
--------
::
>>> from scopesim.source.source_templates import vega_spectrum
>>> from scopesim.effects.ter_curves_utils as ter_utils
>>>
>>> spec = vega_spectrum()
>>> vega_185 = ter_utils.scale_spectrum(spec, "Ks", -1.85 * u.mag)
>>> ab_0 = ter_utils.scale_spectrum(spec, "Ks", 0 * u.ABmag)
>>> jy_3630 = ter_utils.scale_spectrum(spec, "Ks", 3630 * u.Jy)
"""
if isinstance(amplitude, u.Quantity):
if amplitude.unit.physical_type == "spectral flux density":
if amplitude.unit != u.ABmag:
amplitude = amplitude.to(u.ABmag)
ref_spec = ab_spectrum(amplitude.value)
elif amplitude.unit.physical_type == "spectral flux density wav":
if amplitude.unit != u.STmag:
amplitude = amplitude.to(u.STmag)
ref_spec = st_spectrum(amplitude.value)
elif amplitude.unit == u.mag:
ref_spec = vega_spectrum(amplitude.value)
else:
raise ValueError(f"Units of amplitude must be one of "
f"[u.mag, u.ABmag, u.STmag, u.Jy]: {amplitude}")
else:
ref_spec = vega_spectrum(amplitude)
filt = get_filter(filter_name)
ref_flux = Observation(ref_spec, filt).effstim(flux_unit=PHOTLAM)
real_flux = Observation(spectrum, filt).effstim(flux_unit=PHOTLAM)
scale_factor = ref_flux / real_flux
spectrum *= scale_factor.value
return spectrum
[docs]def apply_throughput_to_cube(cube, thru):
"""
Apply throughput curve to a spectroscopic cube.
Parameters
----------
cube : ImageHDU
three-dimensional image, dimension 0 (in python convention) is the
spectral dimension. WCS is required.
thru : synphot.SpectralElement, synphot.SourceSpectrum
Returns
-------
cube : ImageHDU, header unchanged, data multiplied with
wavelength-dependent throughput.
"""
wcs = WCS(cube.header).spectral
wave_cube = wcs.all_pix2world(np.arange(cube.data.shape[0]), 0)[0]
wave_cube = (wave_cube * u.Unit(wcs.wcs.cunit[0])).to(u.AA)
cube.data *= thru(wave_cube).value[:, None, None]
return cube
[docs]def combine_two_spectra(spec_a, spec_b, action, wave_min, wave_max):
"""
Combine transmission and/or emission spectrum with a common waverange.
Spec_A is the source spectrum
Spec_B is either the transmission or emission that should be applied
Parameters
----------
spec_a : synphot.SourceSpectrum
spec_b : synphot.SpectralElement, synphot.SourceSpectrum
action: str
["multiply", "add"]
wave_min, wave_max : quantity
[Angstrom]
Returns
-------
new_source : synphot.SourceSpectrum
"""
if spec_a.waveset is None:
wave_val = spec_b.waveset.value
else:
wave_val = spec_a.waveset.value
mask = (wave_val > wave_min.value) * (wave_val < wave_max.value)
wave = ([wave_min.value] + list(wave_val[mask]) + [wave_max.value]) * u.AA
if "mult" in action.lower():
spec_c = spec_a(wave) * spec_b(wave)
# Diagnostic plots - not for general use
# from matplotlib import pyplot as plt
# plt.plot(wave, spec_a(wave), label="spec_a")
# plt.plot(wave, spec_b(wave), label="spec_b")
# plt.plot(wave, spec_c, label="spec_c")
# plt.xlim(2.9e4, 4.2e4)
# plt.legend()
# plt.show()
elif "add" in action.lower():
spec_c = spec_a(wave) + spec_b(wave)
else:
raise ValueError(f"action {action} unknown")
new_source = SourceSpectrum(Empirical1D, points=wave, lookup_table=spec_c)
new_source.meta.update(spec_b.meta)
new_source.meta.update(spec_a.meta)
return new_source
[docs]def add_edge_zeros(tbl, wave_colname):
if isinstance(tbl, Table):
vals = np.zeros(len(tbl.colnames))
col_i = np.where(col == wave_colname for col in tbl.colnames)[0][0]
sgn = np.sign(np.diff(tbl[wave_colname][:2]))
vals[col_i] = tbl[wave_colname][0] * (1 - 1e-7 * sgn)
tbl.insert_row(0, vals)
vals[col_i] = tbl[wave_colname][-1] * (1 + 1e-7 * sgn)
tbl.insert_row(len(tbl), vals)
return tbl