"""Spectral broadening functions:
Used to convolve the model spectra for
- Stellar rotation broadening.
- Instrumental profile broadening.
Uses joblib.Memory to cache convolution results to skip repeated computation.
"""
from typing import Optional, Union
import joblib
import numpy as np
from astropy import constants as const
from joblib import Memory, Parallel, delayed
from numpy.core.multiarray import ndarray
from tqdm import tqdm
from eniric import config
from eniric.utilities import band_selector, cpu_minus_one, mask_between, wav_selector
# Cache convolution results.
memory = Memory(location=config.cache["location"], verbose=0)
c_kmps = const.c.to("km/s").value
num_cpu_minus_1 = cpu_minus_one()
[docs]@memory.cache(ignore=["num_procs", "verbose"])
def rotational_convolution(
wavelength: ndarray,
extended_wav: ndarray,
extended_flux: ndarray,
vsini: float,
*,
epsilon: float = 0.6,
normalize: bool = True,
num_procs: Optional[Union[int, joblib.parallel.Parallel]] = None,
verbose: bool = True,
) -> ndarray:
r"""Perform Rotational convolution.
Parameters
----------
wavelength: ndarray
Wavelength array.
extended_wav: ndarray
Extended wavelength array to avoid boundary issues.
extended_flux: ndarray
Photon flux for the extended wavelength array.
vsini: float
Rotational velocity in km/s.
epsilon: float
Limb darkening coefficient. Default is 0.6.
normalize: bool
Area normalize the broadening kernel. This corrects for kernel area with unequal wavelength spacing.
Default is True.
num_procs: int, None or joblib.parallel.Parallel.
Number of processes to use, n_job parameter in joblib.
If num_procs = 1, then a single core is used.
Can also be a joblib.parallel.Parallel instance. Default is None.
verbose: bool
Show the tqdm progress bar. Default is True.
Returns
-------
convolved_flux: ndarray
The convolved flux evaluated at the wavelength array.
"""
def element_rot_convolution(single_wav: float) -> float:
r"""Embarrassingly parallel part of rotational convolution.
Calculates the convolution value for a single pixel.
The parameters extended_wav, extended_flux, vsini, epsilon and normalize
are obtained from the outer scope.
Parameters
----------
single_wav: float
Wavelength to calculate the convolution at.
Returns
-------
sum_val: float
Sum of flux convolved for this wavelength.
"""
# Select all values such that they are within the fwhm limits
delta_lambda_l = single_wav * vsini / c_kmps
index_mask = mask_between(
extended_wav, single_wav - delta_lambda_l, single_wav + delta_lambda_l
)
flux_2convolve = extended_flux[index_mask]
rotation_profile = rotation_kernel(
extended_wav[index_mask] - single_wav,
delta_lambda_l,
vsini=vsini,
epsilon=epsilon,
)
sum_val = np.sum(rotation_profile * flux_2convolve)
if normalize:
# Correct for the effect of non-equidistant sampling
unitary_rot_val = np.sum(rotation_profile)
sum_val /= unitary_rot_val
return sum_val
if num_procs is None:
num_procs = num_cpu_minus_1
if vsini != 0:
tqdm_wav = tqdm(wavelength, disable=not verbose)
if isinstance(num_procs, int):
with Parallel(n_jobs=num_procs) as parallel:
convolved_flux = np.asarray(
parallel(delayed(element_rot_convolution)(wav) for wav in tqdm_wav)
)
else:
try:
# Assume num_procs is a joblib.parallel.Parallel.
convolved_flux = np.asarray(
num_procs(delayed(element_rot_convolution)(wav) for wav in tqdm_wav)
)
except TypeError:
raise TypeError(
"num_proc must be an int or joblib.parallel.Parallel. Not '{}'".format(
type(num_procs)
)
)
else:
# Skip convolution for vsini=0
if wavelength is extended_wav:
convolved_flux = extended_flux # No change
else:
# Interpolate to the new wavelength vector.
convolved_flux = np.interp(wavelength, extended_wav, extended_flux)
return convolved_flux
[docs]@memory.cache(ignore=["num_procs", "verbose"])
def resolution_convolution(
wavelength: ndarray,
extended_wav: ndarray,
extended_flux: ndarray,
R: float,
*,
fwhm_lim: float = 5.0,
normalize: bool = True,
num_procs: Optional[Union[int, joblib.parallel.Parallel]] = None,
verbose: bool = True,
) -> ndarray:
r"""Perform Resolution convolution.
Parameters
----------
wavelength: ndarray
Wavelength array.
extended_wav: ndarray
Extended wavelength array to avoid boundary issues.
extended_flux: ndarray
Photon flux for the extended wavelength array.
R: float
Resolution of Guassian instrumental profile.
fwhm_lim: float
FWHM limit for instrument broadening. Default is 5.0.
normalize: bool
Area normalize the broadening kernel. This corrects for kernel area with unequal wavelength spacing.
Default is True.
num_procs: int, None or joblib.parallel.Parallel.
Number of processes to use, n_job parameter in joblib.
If num_procs = 1, then a single core is used.
Can also be a joblib.parallel.Parallel instance.
verbose: bool
Show the tqdm progress bar. Default is True.
Returns
-------
convolved_flux: ndarray
The convolved flux evaluated at the wavelength array.
"""
def element_res_convolution(single_wav: float) -> float:
r"""Embarrassingly parallel component of resolution convolution.
Calculates the convolution value for a single pixel.
The parameters extended_wav, fwhm_lim, R and normalize
are obtained from the outer scope.
Parameters
----------
single_wav: float
Wavelength value to calculate convolution at.
Returns
-------
sum_val: float
Sum of flux convolved for this pixel/wavelength.
"""
fwhm = single_wav / R
# Mask of wavelength range within fwhm_lim* fwhm of wav
fwhm_space = fwhm_lim * fwhm
index_mask = mask_between(
extended_wav, single_wav - fwhm_space, single_wav + fwhm_space
)
flux_2convolve = extended_flux[index_mask]
# Gaussian Instrument Profile for given resolution and wavelength
instrument_profile = unitary_gaussian(
extended_wav[index_mask], single_wav, fwhm=fwhm
)
sum_val = np.sum(instrument_profile * flux_2convolve)
if normalize:
# Correct for the effect of convolution with non-equidistant positions
unitary_val = np.sum(instrument_profile)
sum_val /= unitary_val
return sum_val
tqdm_wav = tqdm(wavelength, disable=not verbose)
if num_procs is None:
num_procs = num_cpu_minus_1
if isinstance(num_procs, int):
with Parallel(n_jobs=num_procs) as parallel:
convolved_flux = np.asarray(
parallel(delayed(element_res_convolution)(wav) for wav in tqdm_wav)
)
else:
# Assume num_procs is joblib.parallel.Parallel.
try:
convolved_flux = np.asarray(
num_procs(delayed(element_res_convolution)(wav) for wav in tqdm_wav)
)
except TypeError:
raise TypeError(
"num_proc must be an int or joblib.parallel.Parallel. Not '{}'".format(
type(num_procs)
)
)
return convolved_flux
[docs]@memory.cache(ignore=["num_procs", "verbose"])
def convolution(
wav: ndarray,
flux: ndarray,
vsini: float,
R: float,
band: str = "All",
*,
epsilon: float = 0.6,
fwhm_lim: float = 5.0,
num_procs: Optional[Union[int, joblib.parallel.Parallel]] = None,
normalize: bool = True,
verbose: bool = True,
):
r"""Perform rotational then Instrumental broadening with convolutions.
Parameters
----------
wav: ndarray
Wavelength array.
flux: ndarray
Flux array.
vsini: float
Rotational velocity in km/s.
R: int
Resolution of instrumental profile.
band: str
Wavelength band to choose, default is "All".
epsilon: float
Limb darkening coefficient. Default is 0.6.
fwhm_lim: float
FWHM limit for instrument broadening. Default is 5.0.
normalize: bool
Area normalize the broadening kernel. This corrects for kernel area with unequal wavelength spacing.
Default is True.
num_procs: int, None or joblib.parallel.Parallel.
Number of processes to use, n_job parameter in joblib.
If num_procs = 1, then a single core is used.
Can also be a joblib.parallel.Parallel instance.
verbose: bool
Show the tqdm progress bar. Default is True.
Returns
-------
wav_band: ndarray
Wavelength for the selected band.
flux_band: ndarray
Original flux for the selected band.
flux_conv: ndarray
Convolved flux for the selected band.
"""
wav_band, flux_band = band_selector(wav, flux, band)
# Calculate FWHM at each end for the convolution
fwhm_min = wav_band[0] / R # fwhm at the extremes of vector
fwhm_max = wav_band[-1] / R
# performing convolution with rotation kernel
if verbose:
print("Starting the Rotation convolution for vsini={0:.2f}...".format(vsini))
delta_lambda_min = wav_band[0] * vsini / c_kmps
delta_lambda_max = wav_band[-1] * vsini / c_kmps
# widest wavelength bin for the rotation convolution
lower_lim = wav_band[0] - delta_lambda_min - fwhm_lim * fwhm_min
upper_lim = wav_band[-1] + delta_lambda_max + fwhm_lim * fwhm_max
wav_ext_rotation, flux_ext_rotation = wav_selector(wav, flux, lower_lim, upper_lim)
# wide wavelength bin for the resolution_convolution
lower_lim = wav_band[0] - fwhm_lim * fwhm_min
upper_lim = wav_band[-1] + fwhm_lim * fwhm_max
extended_wav, __ = wav_selector(wav, flux, lower_lim, upper_lim)
# rotational convolution
flux_conv_rot = rotational_convolution(
extended_wav,
wav_ext_rotation,
flux_ext_rotation,
vsini,
epsilon=epsilon,
num_procs=num_procs,
normalize=normalize,
verbose=verbose,
)
if verbose:
print("Starting the Resolution convolution...")
flux_conv_res = resolution_convolution(
wav_band,
extended_wav,
flux_conv_rot,
R,
fwhm_lim=fwhm_lim,
num_procs=num_procs,
normalize=normalize,
verbose=verbose,
)
return wav_band, flux_band, flux_conv_res
[docs]def unitary_gaussian(
x: Union[range, int, ndarray],
center: Union[float, int, str],
fwhm: Union[float, int, str],
) -> ndarray:
"""Gaussian kernal of area 1.
Parameters
----------
x: array-like
Position array.
center: float
Central position of Gaussian.
fwhm: float
Full Width at Half Maximum.
Returns
-------
kernel: array-like
Gaussian kernel.
"""
if not isinstance(fwhm, (np.float, np.int)):
raise TypeError("The fwhm value is not a number, {0}".format(type(fwhm)))
if not isinstance(center, (np.float, np.int)):
raise TypeError("The center value is not a number, {0}".format(type(center)))
if not isinstance(x, np.ndarray):
raise TypeError
sigma = np.abs(fwhm) / (2 * np.sqrt(2 * np.log(2)))
amp = 1.0 / (sigma * np.sqrt(2 * np.pi))
tau = -((x - center) ** 2) / (2 * (sigma ** 2))
kernel = amp * np.exp(tau)
return kernel
[docs]def rotation_kernel(
delta_lambdas: ndarray, delta_lambda_l: float, vsini: float, epsilon: float
) -> ndarray:
r"""Rotation kernel for a given line center.
Parameters
----------
delta_lambdas: array
Wavelength difference from the line center lambda.
delta_lambda_l: float
Maximum line shift of line center by vsini.
vsini: float
Projected rotational velocity in km/s.
epsilon: float
Linear limb-darkening coefficient, between 0 and 1.
Returns
-------
kernel: array
Rotational kernel.
Notes
-----
Gray, D. F. (2005). The Observation and Analysis of Stellar Photospheres. 3rd ed. Cambridge University Press.
"""
denominator = np.pi * vsini * (1.0 - epsilon / 3.0)
lambda_ratio_sqr = (delta_lambdas / delta_lambda_l) ** 2.0
c1 = 2.0 * (1.0 - epsilon) / denominator
c2 = 0.5 * np.pi * epsilon / denominator
kernel = c1 * np.sqrt(1.0 - lambda_ratio_sqr) + c2 * (1.0 - lambda_ratio_sqr)
return kernel
def oned_circle_kernel(x: ndarray, center: float, fwhm: float):
"""Calculate the convolution kernel for a circular fiber.
Parameters
----------
x: array
Value to evaluate kernel at.
center: float
Center of kernel.
fwhm: float
FWHM of desired kernel.
Returns
-------
Collapsed circle kernel.
Notes
-----
Tries to represent the broadening by the fiber of a fiber feed spectrograph.
Artigau 2018 - stated this is mathematically equivalent to a cosine between -pi/2 and pi/2.
"""
fwhm_scale = 2.094_395_1 # Numerically derived
A = 1 # Amplitude
B = fwhm_scale / fwhm # Scale to give specific fwhm
kernel = A * np.cos(B * (x - center))
# Limit to main cosine lobe only
upper_xi = center + np.pi / 2 / B
lower_xi = center - np.pi / 2 / B
mask = mask_between(x, lower_xi, upper_xi)
kernel[~mask] = 0
return kernel