import warnings
from os.path import join
from typing import List, Optional
import numpy as np
from astropy import constants as const
from numpy import ndarray
import eniric.io_module as io
from eniric import config
from eniric.broaden import resolution_convolution
from eniric.utilities import band_limits
[docs]class Atmosphere(object):
"""Atmospheric transmission object.
Stores wavelength and atmospheric transmission arrays.
Enables telluric masking and accounting for barycentric motion.
Attributes
----------
wl: ndarray
Wavelength array.
transmission: ndarray
Atmospheric transmission (between 0 and 1).
std: ndarray
Standard deviation of transmission.
mask: ndarray
Transmission mask (1's are kept).
shifted: bool
Indicate shifted mask. Default is False.
verbose: bool
Enable verbose. Default is False.
Constructors
------------
from_file(atmmodel)
Read in atmospheric model and prepare.
from_band(band, bary=False)
Read in atmospheric model for given band.
Methods
-------
to_file(fname, header, fmt)
Save the atmospheric model to a txt file.
at(wave)
Return the transmission value at the closest points to wave.
wave_select(wl_min, wl_max)
Slice Atmosphere between two wavelengths.
band_select(band)
Slice Atmosphere to a given band.
copy()
Make a copy of atmosphere object.
mask_transmission(depth)
Mask the transmission below given depth. e.g. 2%
barycenter_broaden(rv, consecutive_test)
Sweep telluric mask symmetrically by +/- a velocity.
broaden(resolution, *kwargs)
Instrument broadening of the atmospheric transmission profile.
Configuration
-------------
Two things can be set for the Atmosphere class in the `config.yaml` file.
The path to the atmosphere data
e.g.
paths:
atmmodel: "path/to/atmmodel/directory"
The name for the atmosphere model *.dat file
atmmodel:
base: "Average_TAPAS_2014"
"""
def __init__(
self,
wavelength,
transmission,
mask=None,
std=None,
shifted=False,
verbose=False,
):
assert len(wavelength) == len(
transmission
), "Wavelength and transmission do not match length."
self.wl = np.asarray(wavelength)
self.transmission = np.asarray(transmission)
if std is None:
self.std = np.zeros_like(wavelength)
else:
self.std = np.asarray(std)
if mask is None:
self.mask = np.ones_like(wavelength, dtype=bool)
else:
self.mask = np.asarray(mask, dtype=bool)
self.shifted = shifted
self.verbose = verbose
[docs] @classmethod
def from_file(cls, atmmodel: str):
"""Read in atmospheric model and prepare.
Alternate constructor for Atmosphere.
Parameters
----------
atmmodel: str
Name of atmosphere file.
"""
wav_atm, flux_atm, std_flux_atm, mask_atm = io.pdread_4col(atmmodel)
wav_atm = wav_atm / 1e3 # conversion from nanometers to micrometers
mask_atm = np.array(mask_atm, dtype=bool)
# We do not use the std from the year atmosphere but need it for compatibility.
shifted = True if "_bary" in atmmodel else False
return cls(
wavelength=wav_atm,
transmission=flux_atm,
mask=mask_atm,
std=std_flux_atm,
shifted=shifted,
)
[docs] @classmethod
def from_band(cls, band: str, bary: bool = False):
"""Read in atmospheric model for given band.
Alternate constructor for Atmosphere. Base on "base" path in config.yaml.
Parameters
----------
band: str
Name of atmosphere file.
bary: bool
Barycentric shift the mask.
"""
extension = "_bary.dat" if bary else ".dat"
atmmodel = join(
config.pathdir,
config.paths["atmmodel"],
"{0}_{1}{2}".format(config.atmmodel["base"], band, extension),
)
try:
# Try find the band file
atm = cls.from_file(atmmodel)
except IOError:
warnings.warn(
"""Could not find band file for band {0}.
It is recommend to create this using
`split_atmosphere.py -b {0}`
`barycenter_broaden_atmmodel.py -b {0}`
Trying to load main atmosphere file for now. (will be slower).""".format(
band
)
)
full_model = join(
config.pathdir,
config.paths["atmmodel"],
"{0}.dat".format(config.atmmodel["base"]),
)
atm = cls.from_file(full_model)
# Shorten to band
atm = atm.wave_select(*band_limits(band))
if bary:
atm.barycenter_broaden(consecutive_test=True)
return atm
[docs] def to_file(
self, fname: str, header: Optional[List[str]] = None, fmt: str = "%11.8f"
):
"""Save the atmospheric model to a txt file.
Converts micron back into nanometers to be consistent with from_file().
Parameters
----------
fname: str
Name of atmosphere file to save to.
header:
Header lines to add.
fmt: str
String formatting
"""
if header is None:
header = ["# atm_wav(nm)", "atm_flux", "atm_std_flux", "atm_mask"]
return io.pdwrite_cols(
fname,
self.wl * 1000,
self.transmission,
self.std,
self.mask.astype(int),
header=header,
float_format=fmt,
)
def __getitem__(self, item):
"""Index Atmosphere by returning a Atmosphere with indexed components."""
return Atmosphere(
wavelength=self.wl[item],
transmission=self.transmission[item],
mask=self.mask[item],
std=self.std[item],
)
[docs] def at(self, wave):
"""Return the transmission value at the closest points to wave.
This assumes that the atmosphere model is sampled at a higher rate
than the stellar spectra.
For instance, the default Tapas model has a sampling if 10 compared to 3.
Parameters
----------
wave: ndarray
Wavelengths at which to return closest atmosphere values.
"""
# Getting the wav, flux and mask values from the atm model
# that are the closest to the stellar wav values, see
# https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array
index_atm = np.searchsorted(self.wl, wave)
wl_len = len(self.wl)
# replace indexes outside the array, at the very end, by the value at the very end
# index_atm = [index if(index < len(wav_atm)) else len(wav_atm)-1 for index in index_atm]
index_mask = index_atm >= wl_len # find broken indices
index_atm[index_mask] = wl_len - 1 # replace with index of end.
return self[index_atm]
[docs] def wave_select(self, wl_min, wl_max):
"""Slice Atmosphere between two wavelengths."""
wl_mask = (self.wl < wl_max) & (self.wl > wl_min)
return self[wl_mask]
[docs] def band_select(self, band):
"""Slice Atmosphere to a given Band."""
wl_min, wl_max = band_limits(band)
return self.wave_select(wl_min, wl_max)
[docs] def copy(self):
"""Make a copy of atmosphere object."""
return Atmosphere(
wavelength=self.wl.copy(),
transmission=self.transmission.copy(),
mask=self.mask.copy(),
std=self.std.copy(),
)
[docs] def mask_transmission(self, depth: float = 2.0) -> None:
"""Mask the transmission below given depth. e.g. 2%.
Updates the mask.
Parameters
----------
depth: float
Telluric line depth percentage to mask out. Default is 2.0.
E.g. depth=2 will mask transmission deeper than 2%.
"""
cutoff = 1 - depth / 100.0
self.mask = self.transmission >= cutoff
[docs] def barycenter_broaden(self, rv: float = 30.0, consecutive_test: bool = False):
"""Sweep telluric mask symmetrically by +/- a velocity.
Updates the objects mask.
Parameters
----------
rv: float
Velocity to extend masks in km/s. Default is 30 km/s.
consecutive_test: bool
Checks for 3 consecutive zeros to mask out transmission. Default is False.
"""
if self.shifted:
warnings.warn(
"Detected that 'shifted' is already True. "
"Check that you want to rv extend the masks again."
)
rv_mps = rv * 1e3 # Convert from km/s into m/s
shift_amplitudes = self.wl * rv_mps / const.c.value
# Operate element wise
blue_shifts = self.wl - shift_amplitudes
red_shifts = self.wl + shift_amplitudes
bary_mask = []
for (blue_wl, red_wl, mask) in zip(blue_shifts, red_shifts, self.mask):
if mask == 0:
this_mask_value = False
else:
# np.searchsorted is faster then the boolean masking wavelength range
# It returns index locations to put the min/max doppler-shifted values
slice_limits = np.searchsorted(self.wl, [blue_wl, red_wl])
slice_limits = [
index if (index < len(self.wl)) else len(self.wl) - 1
for index in slice_limits
] # Fix searchsorted end index
mask_slice = self.mask[slice_limits[0] : slice_limits[1]]
if consecutive_test:
# Mask value False if 3 or more consecutive zeros in slice.
len_consec_zeros = consecutive_truths(~mask_slice)
if np.all(
~mask_slice
): # All pixels of slice is zeros (shouldn't get here)
this_mask_value = False
elif np.max(len_consec_zeros) >= 3:
this_mask_value = False
else:
this_mask_value = True
if np.sum(~mask_slice) > 3:
if self.verbose:
print(
(
"There were {0}/{1} zeros in this "
"barycentric shift but None were 3 consecutive!"
).format(np.sum(~mask_slice), len(mask_slice))
)
else:
this_mask_value = np.bool(
np.product(mask_slice)
) # Any 0s will make it 0
# Checks
if not this_mask_value:
assert np.any(~mask_slice)
else:
if not consecutive_test:
assert np.all(mask_slice)
bary_mask.append(this_mask_value)
self.mask = np.asarray(bary_mask, dtype=np.bool)
self.shifted = True
[docs] def broaden(self, resolution: float, fwhm_lim: float = 5, num_procs=None):
"""Instrument broadening of the atmospheric transmission profile.
This does not change any created masks.
Parameters
----------
resolution: float
Instrumental resolution R.
fwhm_lim: int/float
Number of FWHM to extend the wings of the convolution kernel.
num_procs: Optional[int]
Number of processors to use. Default = total processors - 1
"""
self.transmission = resolution_convolution(
wavelength=self.wl,
extended_wav=self.wl,
extended_flux=self.transmission,
R=resolution,
fwhm_lim=fwhm_lim,
num_procs=num_procs,
normalize=True,
)
[docs]def consecutive_truths(condition: ndarray) -> ndarray:
"""Length of consecutive true values in an bool array.
Parameters
----------
condition: ndarray of bool
True False array of a condition.
Returns
-------
len_consecutive: ndarray of ints
Array of lengths of consecutive true values of the condition.
Notes
-----
Solution found at http://stackoverflow.com/questions/24342047
"""
if not np.any(condition): # No match to condition
return np.array([0])
else:
unequal_consec = np.concatenate(
([condition[0]], condition[:-1] != condition[1:], [True])
)
where_changes = np.where(unequal_consec)[0] # indices where condition changes
len_consecutive = np.diff(where_changes)[
::2
] # step through every second to get the "True" lengths.
return len_consecutive