#!/usr/bin/env python
"""
phoenix_precision.py
--------------------
Script to generate RV precision of synthetic spectra, see :ref:`Calculating-Precisions`.
"""
import argparse
import itertools
import os
import warnings
from typing import Any, List, Optional, Tuple
import numpy as np
from astropy import units as u
from astropy.units import Quantity
from numpy import ndarray
from eniric import config
from eniric.atmosphere import Atmosphere
from eniric.broaden import convolution
from eniric.corrections import correct_artigau_2018
from eniric.precision import quality, rv_precision
from eniric.resample import log_resample
from eniric.snr_normalization import snr_constant_band
from eniric.utilities import (
band_middle,
cpu_minus_one,
doppler_shift_flux,
load_aces_spectrum,
load_btsettl_spectrum,
)
num_cpu_minus_1 = cpu_minus_one()
ref_choices = ["SELF"]
ref_choices.extend(config.bands["all"])
def _parser():
"""Take care of all the argparse stuff."""
parser = argparse.ArgumentParser(
description="Calculate precision and quality for synthetic spectra."
)
parser.add_argument(
"-t",
"--temp",
type=int,
default=[3900],
help="Temperature, default=[3900].",
nargs="+",
)
parser.add_argument(
"-l",
"--logg",
type=float,
default=[4.5],
help="Logg, default = [4.5].",
nargs="+",
)
parser.add_argument(
"-m",
"--metal",
type=float,
default=[0.0],
help="Metallicity, default=[0.0].",
nargs="+",
)
parser.add_argument(
"-a",
"--alpha",
type=float,
default=[0.0],
help="Alpha, default=[0.0].",
nargs="+",
)
parser.add_argument(
"-s",
"--sampling",
type=float,
default=[3.0],
help="Sampling, default=[3.0].",
nargs="+",
)
parser.add_argument(
"-r",
"--resolution",
type=float,
default=[50000],
help="Instrumental resolution, default=[50000]",
nargs="+",
)
parser.add_argument(
"-v",
"--vsini",
type=float,
default=[1.0],
help="Rotational Velocity, default = [1.0]",
nargs="+",
)
parser.add_argument(
"-b",
"--bands",
type=str,
default=["J"],
choices=config.bands["all"],
help="Wavelength bands to select, default=['J'].",
nargs="+",
)
parser.add_argument(
"--model",
type=str,
default="aces",
choices=["aces", "btsettl", "phoenix"],
help="Spectral models to use, default='aces'.",
)
parser.add_argument(
"--snr", help="Mid-band SNR scaling, default=100.", default=100, type=float
)
parser.add_argument(
"--ref_band",
help="SNR reference band, default='J'. "
"'self' scales each band relative to the SNR itself.",
choices=ref_choices,
default="J",
type=str,
)
parser.add_argument(
"--num_procs",
help="Number of processors to use, default = (Total cores - 1)",
default=num_cpu_minus_1,
type=int,
)
parser.add_argument(
"-o",
"--output",
help="Filename for result file, default='precisions.csv'.",
default="precisions.csv",
type=str,
)
parser.add_argument(
"--rv",
help="Radial velocity value, default=[0.0]",
default=[0.0],
type=float,
nargs="+",
)
parser.add_argument(
"--add_rv", help="Include a radial velocity shift.", action="store_true"
)
parser.add_argument(
"--air", help="Convert to air wavelengths.", action="store_true"
)
parser.add_argument(
"--correct",
help="Apply Artigau et al. (2018) RV band corrections.",
action="store_true",
)
parser.add_argument("-V", "--verbose", help="Enable verbose.", action="store_true")
parser.add_argument(
"--disable_normalization",
help="Disable the convolution normalization.",
action="store_true",
)
return parser.parse_args()
[docs]def do_analysis(
star_params,
vsini: float,
R: float,
band: str,
sampling: float = 3.0,
conv_kwargs=None,
snr: float = 100.0,
ref_band: str = "J",
rv: float = 0.0,
air: bool = False,
model: str = "aces",
verbose: bool = False,
) -> Tuple[Quantity, ...]:
"""Calculate RV precision and Quality for specific parameter set.
Parameters
----------
star_param:
Stellar parameters [temp, logg, feh, alpha] for phoenix model libraries.
vsini: float
Stellar equatorial rotation.
R: float
Instrumental resolution.
band: str
Spectral band.
sampling: float (default=False)
Per pixel sampling (after convolutions)
conv_kwargs: Dict (default=None)
Arguments specific for the convolutions,
'epsilon', 'fwhm_lim', 'num_procs', 'normalize', 'verbose'.
snr: float (default=100)
SNR normalization level. SNR per pixel and the center of the ref_band.
ref_band: str (default="J")
Reference band for SNR normalization.
rv: float
Radial velocity in km/s (default = 0.0).
air: bool
Get model in air wavelengths (default=False).
model: str
Name of synthetic library (aces, btsettl) to use. Default = 'aces'.
verbose:
Enable verbose (default=False).
Returns
-------
q: astropy.Quality
Spectral quality.
result_1: astropy.Quality
RV precision under condition 1.
result_2 : astropy.Quality
RV precision under condition 2.
result_3: astropy.Quality
RV precision under condition 3.
Notes
-----
We apply the radial velocity doppler shift after
- convolution (rotation and resolution)
- resampling
- SNR normalization.
in this way the RV only effects the precision due to the telluric mask interaction.
Physically the RV should be applied between the rotational and instrumental convolution
but we assume this effect is negligible.
"""
if conv_kwargs is None:
conv_kwargs = {
"epsilon": 0.6,
"fwhm_lim": 5.0,
"num_procs": num_cpu_minus_1,
"normalize": True,
"verbose": verbose,
}
if ref_band.upper() == "SELF":
ref_band = band
model = check_model(model)
if model == "aces":
wav, flux = load_aces_spectrum(star_params, photons=True, air=air)
elif model == "btsettl":
wav, flux = load_btsettl_spectrum(star_params, photons=True, air=air)
else:
raise Exception("Invalid model name reached.")
wav_grid, sampled_flux = convolve_and_resample(
wav, flux, vsini, R, band, sampling, **conv_kwargs
)
# Doppler shift
try:
if rv != 0:
sampled_flux = doppler_shift_flux(wav_grid, sampled_flux, vel=rv)
except Exception as e:
print("Doppler shift was unsuccessful")
raise e
# Scale normalization for precision
wav_ref, sampled_ref = convolve_and_resample(
wav, flux, vsini, R, ref_band, sampling, **conv_kwargs
)
snr_normalize = snr_constant_band(
wav_ref, sampled_ref, snr=snr, band=ref_band, sampling=sampling, verbose=verbose
)
sampled_flux = sampled_flux / snr_normalize
if (ref_band == band) and verbose:
mid_point = band_middle(ref_band)
index_ref = np.searchsorted(
wav_grid, mid_point
) # searching for the index closer to 1.25 micron
snr_estimate = np.sqrt(np.sum(sampled_flux[index_ref - 1 : index_ref + 2]))
print(
"\tSanity Check: The S/N at {0:4.02} micron = {1:4.2f}, (should be {2:g}).".format(
mid_point, snr_estimate, snr
)
)
# Load Atmosphere for this band.
atm = Atmosphere.from_band(band=band, bary=True).at(wav_grid)
assert np.allclose(atm.wl, wav_grid), "The atmosphere does not cover the wav_grid"
# Spectral Quality/Precision
q = quality(wav_grid, sampled_flux)
# Precision given by the first condition:
result_1 = rv_precision(wav_grid, sampled_flux, mask=None)
# Precision as given by the second condition
result_2 = rv_precision(wav_grid, sampled_flux, mask=atm.mask)
# Precision as given by the third condition: M = T**2
result_3 = rv_precision(wav_grid, sampled_flux, mask=atm.transmission ** 2)
# Turn quality back into a Quantity (to give it a .value method)
q = q * u.dimensionless_unscaled
return q, result_1, result_2, result_3
[docs]def convolve_and_resample(
wav: ndarray,
flux: ndarray,
vsini: float,
R: float,
band: str,
sampling: float,
**conv_kwargs,
) -> Tuple[ndarray, ndarray]:
"""Convolve and resample functions together.
Returns
-------
wav_grid: ndarray
Resampled wavelength array
sampled_flux: ndarray
Convolved and resampled flux array
"""
wav_band, __, convolved_flux = convolution(wav, flux, vsini, R, band, **conv_kwargs)
# Re-sample to sampling per resolution element.
wav_grid = log_resample(wav_band, sampling, R)
sampled_flux = np.interp(wav_grid, wav_band, convolved_flux)
return wav_grid, sampled_flux
[docs]def get_already_computed(filename: str, add_rv: bool = False) -> List[str]:
"""Get the string of already computed model/parameters from the result file."""
param_lines = []
with open(filename, "r") as f:
for line in f:
if add_rv:
# First 10 columns
param_lines.append(select_csv_columns(line, ncols=10))
else:
# First 9 columns
param_lines.append(select_csv_columns(line, ncols=9))
# Strip any spaces
param_lines = [strip_whitespace(value) for value in param_lines]
return param_lines
[docs]def is_already_computed(
computed_values: List[str],
model,
pars,
add_rv: bool = False,
correct: bool = False,
verbose=False,
) -> bool:
"""Check if any combinations have already been preformed.
Correct is boolean for applied Artigau correction."""
model_par_str_args = model_format_args(model, pars)
rv_template = "{0:5d},{1:3.01f},{2:4.01f},{3:3.01f},{4:s},{5:3d}k,{6:4.01f},{7:3.01f},{8:3.01f},{9:1d}"
no_rv_template = (
"{0:5d},{1:3.01f},{2:4.01f},{3:3.01f},{4:s},{5:3d}k,{6:4.01f},{7:3.01f},{8:1d}"
)
if add_rv:
if len(model_par_str_args) != 9:
raise ValueError("model_par_str_args is incorrect length")
idenifying_line = strip_whitespace(
rv_template.format(*model_par_str_args, int(correct))
)
else:
model_par_str_args = model_par_str_args[:8]
if len(model_par_str_args) != 8:
raise ValueError("model_par_str_args is incorrect length")
idenifying_line = strip_whitespace(
no_rv_template.format(*model_par_str_args, int(correct))
)
result = idenifying_line in computed_values
if result and verbose:
print(model_par_str_args, "already computed")
return result
def strip_whitespace(line: str) -> str:
return "".join(line.split())
[docs]def select_csv_columns(line: str, ncols: int = 8) -> str:
"""Select first ncols in a line from a csv.
Parameters
----------
ncols: int
Number of column to select.
Returns
-------
selected_cols: str
Selected ncols of csv.
"""
return ",".join(line.split(",")[:ncols])
[docs]def check_model(model: str) -> str:
"""Check model is 'aces' or 'btsettl'.
Parameters
----------
model: str
Model name. Should be either 'aces' or 'btsettl'.
Returns
-------
model: str
Valid model output; either 'aces' or 'btsettl'.
"""
if model == "phoenix":
warnings.warn(
"The model name 'phoenix' is depreciated, use 'aces' instead.",
DeprecationWarning,
)
model = "aces"
if model not in ["aces", "btsettl"]:
raise ValueError(
"Model name error of '{}'. Valid choices are 'aces' and 'btsettl'".format(
model
)
)
return model
if __name__ == "__main__":
args = _parser()
# check bt-settl parameters
if args.model == "btsettl":
if (args.metal != [0]) or (args.alpha != [0]):
raise ValueError(
"You cannot vary metallicity and alpha for BT-Settl, remove these flags."
)
try:
normalize = not args.disable_normalization
except AttributeError:
normalize = True
try:
num_procs = args.num_procs
except AttributeError:
num_procs = num_cpu_minus_1
conv_kwargs = {
"epsilon": 0.6,
"fwhm_lim": 5.0,
"num_procs": num_procs,
"normalize": normalize,
"verbose": args.verbose,
}
snr = args.snr
air = args.air
if "ALL" in args.bands:
args.bands.extend(config.bands["all"])
args.bands = set(args.bands) # Unique
ref_band = args.ref_band
# Load the relevant spectra
if args.model == "aces":
models_list = itertools.product(args.temp, args.logg, args.metal, args.alpha)
else:
models_list = itertools.product(args.temp, args.logg, [0], [0])
if (args.rv != [0.0]) and (not args.add_rv):
raise ValueError("Need to include --add_rv flag if a non-zero RV is used.")
if not os.path.exists(args.output):
with open(args.output, "a") as f:
header = header_row(add_rv=args.add_rv)
f.write(header)
# Find all model/parameter combinations already computed.
# To later skip recalculation.
computed_values = get_already_computed(args.output, args.add_rv)
# Iteration over stellar model (temp. logg, feh, alpha) then an inner loop over parameters (R, vsini, sampling, rv)
for model in models_list:
# Create generator for params_list
params_list = itertools.product(
args.resolution, args.bands, args.vsini, args.sampling, args.rv
)
for (R, band, vsini, sample, rv) in params_list:
pars = (R, band, vsini, sample, rv)
if is_already_computed(
computed_values,
model,
pars,
add_rv=args.add_rv,
correct=args.correct,
verbose=args.verbose,
):
# skipping the recalculation
continue
else:
precision_result: Tuple[Any, ...] = do_analysis(
model,
vsini=vsini,
R=R,
band=band,
conv_kwargs=conv_kwargs,
snr=snr,
ref_band=ref_band,
sampling=sample,
air=air,
model=args.model,
)
result: List[Optional[float]] = [
round(res.value, 1) if res is not None else None
for res in precision_result
]
if args.correct:
# Apply Artigau 2018 Corrections
corr_value = correct_artigau_2018(band)
for ii, res in enumerate(result):
if (ii > 0) and (result[ii] is not None): # Not the quality
result[ii] = res * corr_value
result[0] = int(result[0]) if result[0] is not None else None
if args.add_rv:
output_template = (
"{0:5d},{1:3.01f},{2:4.01f},{3:3.01f},{4:s},{5:3d}k,"
"{6:4.01f},{7:3.01f},{8:3.01f},{9:1d},{10:6d},{11:5.01f},"
"{12:5.01f},{13:5.01f}\n"
)
output_model_args = model_format_args(model, pars)
else:
output_template = (
"{0:5d},{1:3.01f},{2:4.01f},{3:3.01f},{4:s},{5:3d}k,"
"{6:4.01f},{7:3.01f},{8:1d},{9:6d},{10:5.01f},{11:5.01f},"
"{12:5.01f}\n"
)
output_model_args = model_format_args(model, pars)[:8]
linetowite = output_template.format(
*output_model_args,
int(args.correct),
result[0],
result[1],
result[2],
result[3],
)
with open(args.output, "a") as f:
f.write(strip_whitespace(linetowite) + "\n") # Make csv only
if args.verbose:
print(
"`{1}` completed successfully:\n"
"\tYou shall find you results in '{0}'.".format(
args.output, os.path.basename(__file__)
)
)