from astropy.coordinates import EarthLocation
from astropy.time import Time
from scipy import optimize as opt
import toolviper.utils.logger as logger
import astropy.units as units
import xarray as xr
from astrohack.io.position_mds import AstrohackPositionFile
from astrohack.utils.text import (
get_data_name,
create_dataset_label,
)
from astrohack.utils.conversion import convert_unit, hadec_to_elevation
from astrohack.utils.algorithms import least_squares, phase_wrapping
from astrohack.utils.constants import *
[docs]
def locit_separated_chunk(locit_parms: dict, output_mds: AstrohackPositionFile):
"""
This is the chunk function for locit when treating each DDI separately
Args:
locit_parms: the locit parameter dictionary
output_mds: Output mds file onto which to add results
Returns:
xds save to disk in the .zarr format
"""
input_xdt = locit_parms["xdt_data"]
field_id, time, delays, freq = _get_data_from_locit_xds(
input_xdt, locit_parms["polarization"]
)
ant_key = locit_parms["this_ant"]
ddi_key = locit_parms["this_ddi"]
antenna_info = input_xdt.parent.attrs["antenna_info"]
source_dict = input_xdt.parent.parent.attrs["source_dict"]
if _has_valid_data(field_id, time, delays, ant_key, ddi=ddi_key):
coordinates, delays, lst, elevation_limit, nin = _build_filtered_arrays(
field_id, time, delays, locit_parms, antenna_info, source_dict
)
if _elevation_ok(nin, locit_parms["this_ant"]):
fit, variance, converged = _fit_data(coordinates, delays, locit_parms)
if converged:
model, chi_squared = _compute_chi_squared(
delays,
fit,
coordinates,
locit_parms["fit_kterm"],
locit_parms["fit_delay_rate"],
)
out_xds = _create_output_xds(
coordinates,
lst,
delays,
fit,
variance,
chi_squared,
model,
locit_parms,
freq,
elevation_limit,
antenna_info,
)
output_mds.add_node(out_xds, [ant_key, ddi_key])
[docs]
def locit_combined_chunk(locit_parms: dict, output_mds: AstrohackPositionFile):
"""
This is the chunk function for locit when we are combining the DDIs for an antenna for a single solution
Args:
locit_parms: the locit parameter dictionary
output_mds: Output mds file onto which to add results
Returns:
xds save to disk in the .zarr format
"""
ant_xdt = locit_parms["xdt_data"]
antenna_info = ant_xdt.attrs["antenna_info"]
source_dict = ant_xdt.parent.attrs["source_dict"]
ant_key = locit_parms["this_ant"]
delay_list = []
time_list = []
field_list = []
freq_list = []
for ddi, xdt_data in ant_xdt.items():
this_field_id, this_time, this_delays, freq = _get_data_from_locit_xds(
xdt_data, locit_parms["polarization"]
)
freq_list.append(freq)
field_list.append(this_field_id)
time_list.append(this_time)
delay_list.append(this_delays)
delays = np.concatenate(delay_list)
time = np.concatenate(time_list)
field_id = np.concatenate(field_list)
if _has_valid_data(field_id, time, delays, locit_parms["this_ant"]):
coordinates, delays, lst, elevation_limit, nin = _build_filtered_arrays(
field_id, time, delays, locit_parms, antenna_info, source_dict
)
if _elevation_ok(nin, locit_parms["this_ant"]):
fit, variance, converged = _fit_data(coordinates, delays, locit_parms)
if converged:
model, chi_squared = _compute_chi_squared(
delays,
fit,
coordinates,
locit_parms["fit_kterm"],
locit_parms["fit_delay_rate"],
)
out_xds = _create_output_xds(
coordinates,
lst,
delays,
fit,
variance,
chi_squared,
model,
locit_parms,
freq_list,
elevation_limit,
antenna_info,
)
output_mds.add_node(out_xds, [ant_key])
[docs]
def locit_difference_chunk(locit_parms: dict, output_mds: AstrohackPositionFile):
"""
This is the chunk function for locit when we are combining two DDIs for an antenna for a single solution by using
the difference in phase between the two DDIs of different frequencies
Args:
locit_parms: the locit parameter dictionary
output_mds: Output mds file onto which to add results
"""
ant_xdt = locit_parms["xdt_data"]
antenna_info = ant_xdt.attrs["antenna_info"]
source_dict = ant_xdt.parent.attrs["source_dict"]
ant_key = locit_parms["this_ant"]
ddi_list = list(ant_xdt.keys())
nddis = len(ddi_list)
if nddis != 2:
msg = f"The difference method support only 2 DDIs, {nddis} DDIs provided for Antenna {ant_key.split('_')[1]}."
logger.error(msg)
return None
ddi_0 = _get_data_from_locit_xds(
ant_xdt[ddi_list[0]],
locit_parms["polarization"],
get_phases=True,
split_pols=True,
)
ddi_1 = _get_data_from_locit_xds(
ant_xdt[ddi_list[1]],
locit_parms["polarization"],
get_phases=True,
split_pols=True,
)
time, field_id, delays, freq = _delays_from_phase_differences(ddi_0, ddi_1)
if _has_valid_data(field_id, time, delays, locit_parms["this_ant"]):
coordinates, delays, lst, elevation_limit, nin = _build_filtered_arrays(
field_id, time, delays, locit_parms, antenna_info, source_dict
)
if _elevation_ok(nin, locit_parms["this_ant"]):
fit, variance, converged = _fit_data(coordinates, delays, locit_parms)
if converged:
model, chi_squared = _compute_chi_squared(
delays,
fit,
coordinates,
locit_parms["fit_kterm"],
locit_parms["fit_delay_rate"],
)
out_xds = _create_output_xds(
coordinates,
lst,
delays,
fit,
variance,
chi_squared,
model,
locit_parms,
freq,
elevation_limit,
antenna_info,
)
output_mds.add_node(out_xds, [ant_key])
def _delays_from_phase_differences(ddi_0, ddi_1):
"""
Compute delays from the difference in phase between two DDIs of different frequencies
Args:
ddi_0: First DDI
ddi_1: Second DDI
Returns:
Matched times, matched field ids, matched phase difference delays, difference in frequency
"""
freq = ddi_0[3] - ddi_1[3]
if freq > 0:
pos_time, pos_phase = ddi_0[1:3]
neg_time, neg_phase = ddi_1[1:3]
fields = ddi_0[0]
elif freq < 0:
pos_time, pos_phase = ddi_1[1:3]
neg_time, neg_phase = ddi_0[1:3]
freq *= -1
fields = ddi_1[0]
else:
msg = f"The two DDIs must have different frequencies"
logger.error(msg)
raise ValueError(msg)
if isinstance(fields, list):
time = []
field_id = []
phase = []
for i_pol in range(len(fields)):
this_time, this_field_id, this_phase = _match_times_and_phase_difference(
pos_time[i_pol],
neg_time[i_pol],
pos_phase[i_pol],
neg_phase[i_pol],
fields[i_pol],
)
time.append(this_time)
field_id.append(this_field_id)
phase.append(this_phase)
time = np.concatenate(time)
field_id = np.concatenate(field_id)
phase = np.concatenate(phase)
else:
time, field_id, phase = _match_times_and_phase_difference(
pos_time, neg_time, pos_phase, neg_phase, fields
)
delays = phase / twopi / freq
return time, field_id, delays, freq
def _match_times_and_phase_difference(
pos_time, neg_time, pos_phase, neg_phase, fields, tolerance=1e-8
):
"""
match times and compute the phase differences for the simple case, calls _different_times for the complicated case
Args:
pos_time: Time for the positive phase
neg_time: Time for the negative phase
pos_phase: Positive phase
neg_phase: Negative phase
fields: Field ids
tolerance: Tolerance in time to match time arrays
Returns:
Matched times, matched field ids, -pi, pi wrapped matched phase difference
"""
n_pos_time, n_neg_time = len(pos_time), len(neg_time)
if n_pos_time == n_neg_time:
if np.all(
np.isclose(pos_time, neg_time, tolerance)
): # this the simplest case times are already matched!
return pos_time, fields, phase_wrapping(pos_phase - neg_phase)
else:
return _different_times(
pos_time, neg_time, pos_phase, neg_phase, fields, tolerance
)
else:
return _different_times(
pos_time, neg_time, pos_phase, neg_phase, fields, tolerance
)
def _different_times(pos_time, neg_time, pos_phase, neg_phase, fields, tolerance=1e-8):
"""
match times and compute the phase differences for the complicated case
Args:
pos_time: Time for the positive phase
neg_time: Time for the negative phase
pos_phase: Positive phase
neg_phase: Negative phase
fields: Field ids
tolerance: Tolerance in time to match time arrays
Returns:
Matched times, matched field ids, -pi, pi wrapped matched phase difference
"""
# This solution is not optimal but numpy does not have a task for it, if it ever becomes a bottleneck we can JIT it
out_times = np.sort(
[time for time in pos_time if np.isclose(neg_time, time, tolerance).any()]
)
ntimes = out_times.shape[0]
out_phase = np.ndarray(ntimes)
out_field = np.ndarray(ntimes, dtype=np.int64)
for i_time in range(ntimes):
i_pos = np.absolute(pos_time - out_times[i_time]).argmin()
i_neg = np.absolute(neg_time - out_times[i_time]).argmin()
out_phase[i_time] = pos_phase[i_pos] - neg_phase[i_neg]
out_field[i_time] = fields[i_pos]
return out_times, out_field, phase_wrapping(out_phase)
def _has_valid_data(field_id, time, delays, antenna, ddi=None):
"""
Determine if locit xds has valid data for locit purposes
:param field_id: Array of field ids in time.
:param time: Time axis.
:param delays: Array of delays in time
:param antenna: Antenna key
:param ddi: DDI key
:return:
"""
msg = f"Antenna {get_data_name(antenna)} "
if ddi is not None:
msg += f"DDI {get_data_name(ddi)} "
msg += "has no valid data"
if len(field_id) == 0 or len(time) == 0 or len(delays) == 0:
logger.warning(msg)
return False
else:
return True
def _elevation_ok(nin, antenna, ddi=None):
"""
Determine if elevation limit takes out all the data.
:param nin: Number of filtered points
:param antenna: antenna key
:param ddi: ddi key
:return: True or False
"""
msg = f"Antenna {get_data_name(antenna)} "
if ddi is not None:
msg += f"DDI {get_data_name(ddi)} "
msg += "has no valid data, try decreasing the elevation limit."
if nin > 0:
return True
else:
logger.warning(msg)
return False
def _get_data_from_locit_xds(
xds_data, pol_selection, get_phases=False, split_pols=False
):
"""
Extract data from a .locit.zarr xds, converts the phase gains to delays using the xds frequency
Args:
xds_data: The .locit.zarr xds
pol_selection: Which polarization is requested from the xds
get_phases: return phases rather than delays
split_pols: Different polarizations are not concatenated in a single array if True
Returns:
the field ids
the time in mjd
The delays in seconds or phases in radians
Xds frequency
"""
pol = xds_data.attrs["polarization_scheme"]
freq = xds_data.attrs["frequency"]
if len(pol) != 2:
msg = (
f"Polarization scheme {pol} is not what is expected for antenna based gains"
)
logger.error(msg)
raise ValueError(msg)
elif pol_selection == "both":
phases = [
xds_data[f"P0_PHASE_GAINS"].values,
xds_data[f"P1_PHASE_GAINS"].values,
]
field_id = [xds_data[f"P0_FIELD_ID"].values, xds_data[f"P1_FIELD_ID"].values]
time = [xds_data.p0_time.values, xds_data.p1_time.values]
if not split_pols:
phases = np.concatenate(phases)
field_id = np.concatenate(field_id)
time = np.concatenate(time)
else:
sel_pol_list = [*pol_selection]
phases = []
time = []
field_id = []
for pol_item in sel_pol_list:
if pol_item in pol:
i_pol = np.where(np.array(pol) == pol_item)[0][0]
phases.append(xds_data[f"P{i_pol}_PHASE_GAINS"].values)
time.append(xds_data[f"p{i_pol}_time"].values)
field_id.append(xds_data[f"P{i_pol}_FIELD_ID"].values)
else:
msg = f"Polarization {pol_selection} is not found in data"
logger.warning(msg)
if len(phases) == 0:
msg = f"No valid data found for polarization selection {pol_selection}"
logger.error(msg)
raise RuntimeError(msg)
if not split_pols:
phases = np.concatenate(phases)
field_id = np.concatenate(field_id)
time = np.concatenate(time)
if get_phases:
return field_id, time, phases, freq # field_id, time, phases, frequency
else:
return (
field_id,
time,
phases / twopi / freq,
freq,
) # field_id, time, delays, frequency
def _create_output_xds(
coordinates,
lst,
delays,
fit,
variance,
chi_squared,
model,
locit_parms,
frequency,
elevation_limit,
antenna_info,
):
"""
Create the output xds from the computed quantities and the fit results
Args:
coordinates: The coordinate array used in the fitting
lst: The local sidereal time
delays: The fitted delays
fit: The fit results
variance: the fit error bars
locit_parms: the input parameters
frequency: The frequency or frequencies of the input xds or xdses
elevation_limit: the elevation cutoff
Returns:
The xdt to be plugged to root.
"""
fit_kterm = locit_parms["fit_kterm"]
fit_rate = locit_parms["fit_delay_rate"]
error = np.sqrt(variance)
output_xds = xr.Dataset()
output_xds.attrs["polarization"] = locit_parms["polarization"]
output_xds.attrs["frequency"] = frequency
output_xds.attrs["position_fit"] = fit[1:4]
output_xds.attrs["position_error"] = error[1:4]
output_xds.attrs["fixed_delay_fit"] = fit[0]
output_xds.attrs["fixed_delay_error"] = error[0]
output_xds.attrs["antenna_info"] = antenna_info
output_xds.attrs["elevation_limit"] = elevation_limit
output_xds.attrs["chi_squared"] = chi_squared
if fit_kterm and fit_rate:
output_xds.attrs["koff_fit"] = fit[4]
output_xds.attrs["koff_error"] = error[4]
output_xds.attrs["rate_fit"] = fit[5]
output_xds.attrs["rate_error"] = error[5]
elif fit_kterm and not fit_rate:
output_xds.attrs["koff_fit"] = fit[4]
output_xds.attrs["koff_error"] = error[4]
elif not fit_kterm and fit_rate:
output_xds.attrs["rate_fit"] = fit[4]
output_xds.attrs["rate_error"] = error[4]
else:
pass # Nothing to be added to the attributes
coords = {"time": coordinates[3, :]}
output_xds["DELAYS"] = xr.DataArray(delays, dims=["time"])
output_xds["MODEL"] = xr.DataArray(model, dims=["time"])
output_xds["HOUR_ANGLE"] = xr.DataArray(coordinates[0, :], dims=["time"])
output_xds["DECLINATION"] = xr.DataArray(coordinates[1, :], dims=["time"])
output_xds["ELEVATION"] = xr.DataArray(coordinates[2, :], dims=["time"])
output_xds["LST"] = xr.DataArray(lst, dims=["time"])
return output_xds.assign_coords(coords)
def _fit_data(coordinates, delays, locit_parms):
"""
Execute the fitting using the desired engine, scipy or linear algebra
Args:
coordinates: the shape [4, : ] array with the ha, dec, elevation and time arrays
delays: The delays to be fitted
locit_parms: the locit input paramters
Returns:
fit: the fit results
variance: the diagonal of the covariance matrix
"""
try:
ddi_id = locit_parms["this_ddi"]
except KeyError:
ddi_id = None
label = create_dataset_label(locit_parms["this_ant"], ddi_id)
fit_kterm = locit_parms["fit_kterm"]
fit_rate = locit_parms["fit_delay_rate"]
fit_engine = locit_parms["fit_engine"]
if fit_engine == "linear algebra":
try:
fit, variance = _solve_linear_algebra(
coordinates, delays, fit_kterm, fit_rate
)
return fit, variance, True
except np.linalg.LinAlgError:
logger.warning(
f"Fitting failed for {label}, please try another fitting engine or DDI combination"
)
return np.nan, np.nan, False
elif fit_engine == "scipy":
try:
fit, variance = _solve_scipy_optimize_curve_fit(
coordinates, delays, fit_kterm, fit_rate, verbose=True
)
return fit, variance, True
except TypeError:
logger.warning(
f"Fitting failed for {label}, please try another fitting engine or DDI combination"
)
return np.nan, np.nan, False
else:
msg = f'Unrecognized fitting engine: {locit_parms["fit_engine"]}'
logger.error(msg)
return np.nan, np.nan, False
def _compute_chi_squared(delays, fit, coordinates, fit_kterm, fit_rate):
"""
Compute a model from fit results and computes the chi squared value of that model with respect to the data
Args:
delays: The observed delays
fit: The fit results
coordinates: ha, dec, elevation, time
fit_kterm: K term fitted?
fit_rate: delay rate fitted?
Returns:
The delay model and the chi squared value
"""
model_function, _ = _define_fit_function(fit_kterm, fit_rate)
model = model_function(coordinates, *fit)
n_delays = len(delays)
chi_squared = np.sum((model - delays) ** 2 / n_delays)
return model, chi_squared
def _build_filtered_arrays(
field_id, time, delays, locit_parms, antenna_info, source_dict
):
"""Build the coordinate arrays (ha, dec, elevation, time) for use in the fitting and filters data below the \
elevation limit
Args:
field_id: Array with the observed field per delay
time: Time array with the time of each delay
delays: The delay array
locit_parms: Locit main function parameters
Returns:
coordinates (ha, dec, ele, time), delays, local sidereal time all filtered by elevation limit and the \
elevation_limit
"""
elevation_limit = locit_parms["elevation_limit"] * convert_unit(
"deg", "rad", "trigonometric"
)
geo_pos = antenna_info["geocentric_position"]
ant_pos = EarthLocation.from_geocentric(geo_pos[0], geo_pos[1], geo_pos[2], "meter")
astro_time = Time(time, format="mjd", scale="utc", location=ant_pos)
lst = astro_time.sidereal_time("apparent").to(units.radian) / units.radian
key = "precessed"
n_samples = len(field_id)
coordinates = np.ndarray([4, n_samples])
for i_sample in range(n_samples):
field = str(field_id[i_sample])
coordinates[0:2, i_sample] = source_dict[field][key]
coordinates[2, i_sample] = hadec_to_elevation(
source_dict[field][key], antenna_info["latitude"]
)
coordinates[3, i_sample] = (
time[i_sample] - time[0]
) # time is set to zero at the beginning of obs
# convert to actual hour angle and wrap it to the [-pi, pi) interval
coordinates[0, :] = lst.value - coordinates[0, :]
coordinates[0, :] = np.where(
coordinates[0, :] < 0, coordinates[0, :] + twopi, coordinates[0, :]
)
# Filter data below elevation limit
selection = coordinates[2, :] > elevation_limit
delays = delays[selection]
coordinates = coordinates[:, selection]
lst = lst[selection]
nin = np.sum(selection)
return coordinates, delays, lst, elevation_limit, nin
def _geometrical_coeffs(coordinates):
"""
Compute the position related coefficients for the fitting, also the 1 corresponding to the fixed delay
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
Returns:
the fixed delay coefficient (1), the x, y and z position delay coeffcients
"""
ha, dec = coordinates[0:2]
cosdec = np.cos(dec)
xterm = np.cos(ha) * cosdec
yterm = -np.sin(ha) * cosdec
zterm = np.sin(dec)
return [1.0, xterm, yterm, zterm]
def _kterm_coeff(coordinates):
"""Compute the k term (offset from antenna elevation axis) coefficient from elevation
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
Returns:
The offset from antenna elevation axis delay coefficient
"""
elevation = coordinates[2]
return np.cos(elevation)
def _rate_coeff(coordinates):
"""Compute the delay rate coefficient (basically the time)
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
Returns:
The delay rate coeeficient (time)
"""
return coordinates[3]
def _solve_linear_algebra(coordinates, delays, fit_kterm, fit_rate):
"""
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
delays: The delays
fit_kterm: fit elevation axis offset term
fit_rate: fit delay rate term
Returns:
The fit results and the diagonal of the covariance matrix.
"""
npar = 4 + fit_rate + fit_kterm
system = np.zeros([npar, npar])
vector = np.zeros([npar])
n_samples = coordinates.shape[1]
for i_sample in range(n_samples):
coeffs = _system_coefficients(coordinates[:, i_sample], fit_kterm, fit_rate)
for irow in range(npar):
for icol in range(irow + 1):
system[irow, icol] += coeffs[irow] * coeffs[icol]
vector[irow] += delays[i_sample] * coeffs[irow]
for irow in range(1, npar):
for icol in range(irow):
system[icol, irow] = system[irow, icol]
fit, variance, _ = least_squares(system, vector)
return fit, variance
def _system_coefficients(coordinates, fit_kterm, fit_rate):
"""Build coefficient list for linear algebra fit
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
fit_kterm: fit elevation axis offset term
fit_rate: Fit delay rate term
Returns:
"""
coeffs = _geometrical_coeffs(coordinates)
if fit_kterm:
coeffs.append(_kterm_coeff(coordinates))
if fit_rate:
coeffs.append(_rate_coeff(coordinates))
return coeffs
def _define_fit_function(fit_kterm, fit_rate):
"""
Define the fitting function based on the presence of the delay rate and elevation axis offset terms
Args:
fit_kterm: fit elevation axis offset?
fit_rate: fit delay rate?
Returns:
The appropriate fitting function and the total number of parameters
"""
npar = 4 + fit_rate + fit_kterm
if fit_kterm and fit_rate:
fit_function = _delay_model_kterm_rate
elif fit_kterm and not fit_rate:
fit_function = _delay_model_kterm_norate
elif not fit_kterm and fit_rate:
fit_function = _delay_model_nokterm_rate
else:
fit_function = _delay_model_nokterm_norate
return fit_function, npar
def _solve_scipy_optimize_curve_fit(
coordinates, delays, fit_kterm, fit_rate, verbose=False
):
"""
Fit a delay model to the observed delays using scipy optimize curve_fit algorithm
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
delays: The observed delays
fit_kterm: fit elevation axis offset term
fit_rate: Fit delay rate term
verbose: Display fitting messages
Returns:
The fit results and the diagonal of the covariance matrix
"""
fit_function, npar = _define_fit_function(fit_kterm, fit_rate)
# First guess is no errors in positions, no fixed delay and no delay rate
p0 = np.zeros(npar)
liminf = np.full(npar, -np.inf)
limsup = np.full(npar, +np.inf)
maxfevs = [100000, 1000000, 10000000]
covar = None
fit = None
for maxfev in maxfevs:
try:
results = opt.curve_fit(
fit_function,
coordinates,
delays,
p0=p0,
bounds=[liminf, limsup],
maxfev=maxfev,
)
fit, covar = results[0:2]
except RuntimeError:
if verbose:
logger.info("Increasing number of iterations")
continue
else:
if verbose:
logger.info(
"Converged with less than {0:d} iterations".format(maxfev)
)
break
variance = np.diag(covar)
return fit, variance
def _delay_model_nokterm_norate(coordinates, fixed_delay, xoff, yoff, zoff):
"""
Delay model with no elevation axis offset or delay rate
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
fixed_delay: Fixed delay value
xoff: X direction delay in antenna frame
yoff: Y direction delay in antenna frame
zoff: Z direction delay in antenna frame
Returns:
Delays model at coordinates
"""
coeffs = _geometrical_coeffs(coordinates)
xterm = coeffs[1] * xoff
yterm = coeffs[2] * yoff
zterm = coeffs[3] * zoff
return xterm + yterm + zterm + fixed_delay
def _delay_model_kterm_norate(coordinates, fixed_delay, xoff, yoff, zoff, koff):
"""
Delay model with elevation axis offset and no delay rate
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
fixed_delay: Fixed delay value
xoff: X direction delay in antenna frame
yoff: Y direction delay in antenna frame
zoff: Z direction delay in antenna frame
koff: Elevation axis offset delay
Returns:
Delays model at coordinates
"""
coeffs = _geometrical_coeffs(coordinates)
xterm = coeffs[1] * xoff
yterm = coeffs[2] * yoff
zterm = coeffs[3] * zoff
kterm = _kterm_coeff(coordinates) * koff
return xterm + yterm + zterm + fixed_delay + kterm
def _delay_model_nokterm_rate(coordinates, fixed_delay, xoff, yoff, zoff, rate):
"""
Delay model with delay rate and no elevation axis offset
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
fixed_delay: Fixed delay value
xoff: X direction delay in antenna frame
yoff: Y direction delay in antenna frame
zoff: Z direction delay in antenna frame
rate: delay rate
Returns:
Delays model at coordinates
"""
coeffs = _geometrical_coeffs(coordinates)
xterm = coeffs[1] * xoff
yterm = coeffs[2] * yoff
zterm = coeffs[3] * zoff
sterm = _rate_coeff(coordinates) * rate
return xterm + yterm + zterm + fixed_delay + sterm
def _delay_model_kterm_rate(coordinates, fixed_delay, xoff, yoff, zoff, koff, rate):
"""
Delay model with delay rate and elevation axis offset
Args:
coordinates: coordinate arrays (ha, dec, ele, time)
fixed_delay: Fixed delay value
xoff: X direction delay in antenna frame
yoff: Y direction delay in antenna frame
zoff: Z direction delay in antenna frame
koff: Elevation axis offset delay
rate: delay rate
Returns:
Delays model at coordinates
"""
coeffs = _geometrical_coeffs(coordinates)
xterm = coeffs[1] * xoff
yterm = coeffs[2] * yoff
zterm = coeffs[3] * zoff
sterm = _rate_coeff(coordinates) * rate
kterm = _kterm_coeff(coordinates) * koff
return xterm + yterm + zterm + fixed_delay + kterm + sterm