import numpy
import numpy as np
import scipy.signal as scisig
import scipy.constants
import xarray as xr
from numba import njit
import pandas as pd
from scipy.spatial import distance_matrix
import toolviper.utils.logger as logger
from astrohack.utils.text import format_angular_distance, create_dataset_label
from astrohack.utils.conversion import convert_unit
from astrohack.utils.constants import pi, twopi, njit_caching
[docs]
def tokenize_version_number(version_number):
"""
Tokenize a version number into an array of integers
Args:
version_number: The astrohack number version to be tokenized
Returns:
Tokenized version number in 3 element numpy array of integers
"""
if not isinstance(version_number, str):
raise ValueError(f"Version number: {version_number} is not a string")
split = version_number.split(".")
if len(split) != 3:
raise ValueError(f"Version number: {version_number} is badly formated")
tokenized = np.ndarray([3], dtype=int)
for itoken in range(len(split)):
try:
tokenized[itoken] = int(split[itoken])
except ValueError:
raise ValueError(
f"Version number: {version_number} is not composed of integers"
)
return tokenized
[docs]
def data_from_version_needs_patch(version_to_check, patched_version):
"""
Check if data from a version needs to be patched according to a reference patch version
Args:
version_to_check: The version that is being tested
patched_version: Reference version at which the patch is no longer needed
Returns:
True if the checked version is from before the patch False elsewise
"""
check = tokenize_version_number(version_to_check)
patched = tokenize_version_number(patched_version)
for itoken in range(3):
if check[itoken] < patched[itoken]:
return True
elif check[itoken] > patched[itoken]:
return False
else:
continue
return False
def _apply_mask(data, scaling=0.5):
"""Applies a cropping mask to the input data according to the scale factor
Args:
data (numpy,ndarray): numpy array containing the aperture grid.
scaling (float, optional): Scale factor which is used to determine the amount of the data to crop, ex. scale=0.5
means to crop the data by 50%. Defaults to 0.5.
Returns:
numpy.ndarray: cropped aperture grid data
"""
x, y = data.shape
assert scaling > 0, logger.error("Scaling must be > 0")
mask = int(x // (1 // scaling))
assert mask > 0, logger.error(
"Scaling values too small. Minimum values is:{}, though search may still fail due to lack of points.".format(
1 / x
)
)
start = int(x // 2 - mask // 2)
return data[start : (start + mask), start : (start + mask)]
[docs]
def calc_coords(image_size, cell_size):
"""Calculate the center pixel of the image given a cell and image size
Args:
image_size (np.ndarray): image size
cell_size (np.ndarray): cell size
Returns:
float, float: center pixel location in coordinates x, y
"""
image_center = image_size // 2
x = (
np.arange(-image_center[0], image_size[0] - image_center[0]) * cell_size[0]
+ cell_size[0] / 2
)
y = (
np.arange(-image_center[1], image_size[1] - image_center[1]) * cell_size[1]
+ cell_size[1] / 2
)
return x, y
[docs]
def find_nearest(array, value):
"""Find the nearest entry in array to that of value.
Args:
array (numpy.array): _description_
value (float): _description_
Returns:
int, float: index, array value
"""
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return idx, array[idx]
[docs]
def chunked_average(data, weight, avg_map, avg_freq):
"""
Average visibilities in chunks with close enough frequencies
Args:
data: Visibility data
weight: Visibility weights
avg_map: mapping of channels to average
avg_freq: new frequency ranges
Returns:
Chunked average of visibilities and weights
"""
avg_chan_index = np.arange(avg_freq.shape[0])
data_avg_shape = list(data.shape)
n_time, n_chan, n_pol = data_avg_shape
n_avg_chan = avg_freq.shape[0]
# Update new chan dim.
data_avg_shape[1] = n_avg_chan
data_avg = np.zeros(data_avg_shape, dtype=np.complex128)
weight_sum = np.zeros(data_avg_shape, dtype=np.float64)
index = 0
for avg_index in avg_chan_index:
while (index < n_chan) and (avg_map[index] == avg_index):
# Most probably will have to unravel assignment
data_avg[:, avg_index, :] = (
data_avg[:, avg_index, :] + weight[:, index, :] * data[:, index, :]
)
weight_sum[:, avg_index, :] = (
weight_sum[:, avg_index, :] + weight[:, index, :]
)
index = index + 1
for time_index in range(n_time):
for pol_index in range(n_pol):
if weight_sum[time_index, avg_index, pol_index] == 0:
data_avg[time_index, avg_index, pol_index] = 0.0
else:
data_avg[time_index, avg_index, pol_index] = (
data_avg[time_index, avg_index, pol_index]
/ weight_sum[time_index, avg_index, pol_index]
)
return data_avg, weight_sum
def _calculate_euclidean_distance(x, y, center):
"""Calculates the Euclidean distance between a pair of input points.
Args:
x (float, np.ndarray): x-coordinate
y (float, np.ndarray): y-coordinate
center (tuple (float)): float tuple containing the coordinates to the center pixel
Returns:
float, np.ndarray: euclidean distance of points from center pixel
"""
return np.sqrt(np.power(x - center[0], 2) + np.power(y - center[1], 2))
[docs]
def find_peak_beam_value(data, height=0.5, scaling=0.5):
"""Search algorithm to determine the maximal signal peak in the beam pattern.
Args:
data (numpy.ndarray): beam data grid
height (float, optional): Peak threshold. Looks for the maximum peak in data and uses a percentage of this
peak to determine a threshold for other peaks. Defaults to 0.5.
scaling (float, optional): scaling factor for beam data cropping. Defaults to 0.5.
Returns:
float: peak maximum value
"""
masked_data = _apply_mask(data, scaling=scaling)
array = masked_data.flatten()
cutoff = np.abs(array).max() * height
index, _ = scisig.find_peaks(np.abs(array), height=cutoff)
x, y = np.unravel_index(index, masked_data.shape)
center = (masked_data.shape[0] // 2, masked_data.shape[1] // 2)
distances = _calculate_euclidean_distance(x, y, center)
index = distances.argmin()
return masked_data[x[index], y[index]]
[docs]
def gauss_elimination(system, vector):
"""
Gauss elimination solving of a system using numpy
Args:
system: System matrix to be solved
vector: Vector that represents the right hand side of the system
Returns:
The solved system
"""
inverse = np.linalg.inv(system)
return np.dot(inverse, vector)
[docs]
def least_squares(system, vector, return_sigma=False):
"""
Least squares fitting of a system of linear equations
The variances are simplified as the diagonal of the covariances
Args:
system: System matrix to be solved
vector: Vector that represents the right hand side of the system
return_sigma: Return sigma value
Returns:
The solved system, the variances of the system solution and the sum of the residuals
"""
if len(system.shape) != 2:
raise ValueError("System must have 2 dimensions")
if system.shape[0] < system.shape[1]:
raise ValueError(
"System must have at least the same number of rows as it has of columns"
)
result, residuals, _, _ = np.linalg.lstsq(system, vector, rcond=None)
dof = len(vector) - len(result)
if dof > 0:
errs = (vector - np.dot(system, result)) / dof
else:
errs = vector - np.dot(system, result)
sigma2 = np.sum(errs**2)
covar = np.linalg.inv(np.dot(system.T, system))
variance = np.diag(sigma2 * covar)
if return_sigma:
return result, variance, residuals, np.sqrt(sigma2)
else:
return result, variance, residuals
@njit(cache=njit_caching, nogil=True)
[docs]
def least_squares_jit(system, vector):
"""
Least squares fitting of a system of linear equations
The variances are simplified as the diagonal of the covariances
Args:
system: System matrix to be solved
vector: Vector that represents the right hand side of the system
Returns:
The solved system, the variances of the system solution and the sum of the residuals
"""
if len(system.shape) != 2:
raise ValueError("System must have 2 dimensions")
if system.shape[0] < system.shape[1]:
raise ValueError(
"System must have at least the same number of rows as it has of columns"
)
result, residuals, _, _ = np.linalg.lstsq(system, vector)
dof = len(vector) - len(result)
if dof > 0:
errs = (vector - np.dot(system, result)) / dof
else:
errs = vector - np.dot(system, result)
sigma2 = np.sum(errs**2)
covar = np.linalg.inv(np.dot(system.T, system))
variance = np.diag(sigma2 * covar)
return result, variance, residuals, np.sqrt(sigma2)
def _least_squares_fit_block(system, vector):
"""
Least squares fitting of a system of linear equations
The variances are simplified as the diagonal of the covariances
Args:
system: System matrix to be solved
vector: Vector that represents the right hand side of the system
Returns:
The solved system and the variances of the system solution
"""
if len(system.shape) < 2:
raise ValueError("System block must have at least 2 dimensions")
if system.shape[-2] < system.shape[-1]:
raise ValueError(
"Systems must have at least the same number of rows as they have of columns"
)
shape = system.shape
results = np.zeros_like(vector)
variances = np.zeros_like(vector)
if len(shape) > 2:
for it0 in range(shape[0]):
results[it0], variances[it0] = _least_squares_fit_block(
system[it0], vector[it0]
)
else:
results, variances, _ = least_squares(system, vector)
return results, variances
[docs]
def calculate_optimal_grid_parameters(
pnt_map_dict, antenna_name, telescope_diameter, chan_freq, ddi
):
reference_frequency = np.median(chan_freq)
reference_lambda = scipy.constants.speed_of_light / reference_frequency
# reference_lambda / D is the maximum cell size we should use so reduce is by 85% to get a safer answer.
# Since this is just an estimate for the situation where the user doesn't specify a values, I am picking
# a values according to the developer heuristic, i.e. it seems to be good.
cell_size = 0.85 * reference_lambda / telescope_diameter
pnt_off = pnt_map_dict[antenna_name].POINTING_OFFSET.values
# Get data range
data_range = np.array(
[
np.nanmax(pnt_off[:, 0]) - np.nanmin(pnt_off[:, 0]),
np.nanmax(pnt_off[:, 1]) - np.nanmin(pnt_off[:, 1]),
]
)
logger.info(
f"{create_dataset_label(antenna_name, ddi)}: Suggested cell size {format_angular_distance(cell_size)}, "
f"FOV: ({format_angular_distance(data_range[0])}, {format_angular_distance(data_range[1])})"
)
try:
n_pix = np.ceil(data_range / cell_size)
except ZeroDivisionError:
logger.error(
f"Zero division error, there was likely a problem calculating the data range.",
verbose=True,
)
raise ZeroDivisionError
if n_pix[0] != n_pix[1]:
n_pix[:] = np.max(n_pix)
return [int(n_pix[0]), int(n_pix[1])], cell_size.tolist()
[docs]
def compute_average_stokes_visibilities(vis, stokes):
n_chan = len(vis.chan)
chan_ave_vis = vis.mean(dim="chan", skipna=True)
amp, pha, sigma_amp, sigma_pha = compute_stokes(
chan_ave_vis["VIS"].values,
n_chan * chan_ave_vis["WEIGHT"].values,
chan_ave_vis.pol,
)
coords = {"time": chan_ave_vis.time, "pol": ["I", "Q", "U", "V"]}
xds = xr.Dataset()
xds = xds.assign_coords(coords)
xds["AMPLITUDE"] = xr.DataArray(amp, dims=["time", "pol"], coords=coords)
xds["PHASE"] = xr.DataArray(pha, dims=["time", "pol"], coords=coords)
xds["SIGMA_AMP"] = xr.DataArray(sigma_amp, dims=["time", "pol"], coords=coords)
xds["SIGMA_PHA"] = xr.DataArray(sigma_amp, dims=["time", "pol"], coords=coords)
xds.attrs["frequency"] = np.mean(vis.chan) / 1e9 # in GHz
return xds.sel(pol=stokes)
[docs]
def compute_stokes(data, weight, pol_axis):
stokes_data = np.zeros_like(data)
weight[weight == 0] = np.nan
sigma = np.sqrt(1 / weight)
sigma_amp = np.zeros_like(weight)
if "RR" in pol_axis:
stokes_data[:, 0] = (data[:, 0] + data[:, 3]) / 2
sigma_amp[:, 0] = (sigma[:, 0] + sigma[:, 3]) / 2
stokes_data[:, 1] = (data[:, 1] + data[:, 2]) / 2
sigma_amp[:, 1] = (sigma[:, 1] + sigma[:, 2]) / 2
stokes_data[:, 2] = 1j * (data[:, 1] - data[:, 2]) / 2
sigma_amp[:, 2] = sigma_amp[:, 1]
stokes_data[:, 3] = (data[:, 0] - data[:, 3]) / 2
sigma_amp[:, 0] = (sigma[:, 0] + sigma[:, 3]) / 2
elif "XX" in pol_axis:
stokes_data[:, 0] = (data[:, 0] + data[:, 3]) / 2
sigma_amp[:, 0] = (sigma[:, 0] + sigma[:, 3]) / 2
stokes_data[:, 1] = (data[:, 0] - data[:, 3]) / 2
sigma_amp[:, 1] = sigma_amp[:, 0]
stokes_data[:, 2] = (data[:, 1] + data[:, 2]) / 2
sigma_amp[:, 2] = (sigma[:, 1] + sigma[:, 2]) / 2
stokes_data[:, 3] = 1j * (data[:, 1] - data[:, 2]) / 2
sigma_amp[:, 3] = sigma_amp[:, 2]
else:
raise ValueError("Pol not supported " + str(pol_axis))
stokes_amp = np.absolute(stokes_data)
stokes_pha = np.angle(stokes_data, deg=True)
sigma_amp[~np.isfinite(sigma_amp)] = np.nan
sigma_amp[sigma_amp == 0] = np.nan
snr = stokes_amp / sigma_amp
cst = np.sqrt(9 / (2 * np.pi**3))
# Both sigmas here are probably wrong because of the uncertainty of how weights are stored.
sigma_pha = np.pi / np.sqrt(3) * (1 - cst * snr)
sigma_pha = np.where(snr > 2.5, 1 / snr, sigma_pha)
sigma_pha *= convert_unit("rad", "deg", "trigonometric")
return stokes_amp, stokes_pha, sigma_amp, sigma_pha
[docs]
def compute_antenna_relative_off(antenna, tel_lon, tel_lat, tel_rad, scaling=1.0):
"""
Computes an antenna offset to the array center
Args:
antenna: Antenna information dictionary
tel_lon: array center longitude
tel_lat: array center latitude
tel_rad: array center's distance to the center of the earth
scaling: scale factor
Returns:
Offset to the east, Offset to the North, elevation offset and distance to array center
"""
antenna_off_east = tel_rad * (antenna["longitude"] - tel_lon) * np.cos(tel_lat)
antenna_off_north = tel_rad * (antenna["latitude"] - tel_lat)
antenna_off_ele = antenna["radius"] - tel_rad
antenna_dist = np.sqrt(
antenna_off_east**2 + antenna_off_north**2 + antenna_off_ele**2
)
return (
antenna_off_east * scaling,
antenna_off_north * scaling,
antenna_off_ele * scaling,
antenna_dist * scaling,
)
[docs]
def rotate_to_gmt(positions, errors, longitude):
"""
Rotate geometrical delays from antenna reference frame to GMT reference frame
Args:
positions: geometrical delays
errors: geometrical delay errors
longitude: Antenna longitude
Returns:
Rotated geometrical delays and associated errors
"""
xpos, ypos = positions[0:2]
delta_lon = longitude
cosdelta = np.cos(delta_lon)
sindelta = np.sin(delta_lon)
newpositions = positions
newpositions[0] = xpos * cosdelta - ypos * sindelta
newpositions[1] = xpos * sindelta + ypos * cosdelta
newerrors = errors
xerr, yerr = errors[0:2]
newerrors[0] = np.sqrt((xerr * cosdelta) ** 2 + (yerr * sindelta) ** 2)
newerrors[1] = np.sqrt((yerr * cosdelta) ** 2 + (xerr * sindelta) ** 2)
return newpositions, newerrors
[docs]
def data_statistics(data_array):
data_stats = {
"mean": np.nanmean(data_array),
"median": np.nanmedian(data_array),
"rms": np.nanstd(data_array),
"min": np.nanmin(data_array),
"max": np.nanmax(data_array),
}
return data_stats
[docs]
def phase_wrapping(phase):
"""
Wraps phase to the -pi to pi interval
Args:
phase: phase to be wrapped
Returns:
Phase wrapped to the -pi to pi interval
"""
return (phase + pi) % twopi - pi
[docs]
def create_coordinate_images(x_axis, y_axis, create_polar_coordinates=False):
"""
Takes two axes and creates 2D representation of the image coordinates
Args:
x_axis: X axis
y_axis: Y axis
create_polar_coordinates: Also create polar coordinates images?
Returns:
x_mesh and y_mesh, plus radius_mesh and polar_angle_mesh if create_polar_coordinates
"""
shape = [x_axis.shape[0], y_axis.shape[0]]
x_mesh = np.empty(shape)
y_mesh = np.empty(shape)
x_mesh[:, :] = x_axis[np.newaxis, :]
y_mesh[:, :] = -y_axis[:, np.newaxis]
if create_polar_coordinates:
radius_mesh = np.sqrt(x_mesh**2 + y_mesh**2)
polar_angle_mesh = (
phase_wrapping(np.arctan2(y_mesh, x_mesh) + np.pi / 2) + np.pi
)
polar_angle_mesh = np.where(
polar_angle_mesh < 0, polar_angle_mesh + twopi, polar_angle_mesh
)
return x_mesh, y_mesh, radius_mesh, polar_angle_mesh
else:
return x_mesh, y_mesh
[docs]
def arm_shadow_masking(
inmask, x_mesh, y_mesh, radius_mesh, minradius, maxradius, width, angle
):
radial_mask = np.where(radius_mesh < minradius, False, inmask)
radial_mask = np.where(radius_mesh >= maxradius, False, radial_mask)
if angle % pi / 2 == 0:
oumask = np.where(
np.bitwise_and(np.abs(x_mesh) < width / 2.0, radial_mask), False, inmask
)
oumask = np.where(
np.bitwise_and(np.abs(y_mesh) < width / 2.0, radial_mask), False, oumask
)
else:
# first shadow
coeff = np.tan(angle % pi)
distance = np.abs((coeff * x_mesh - y_mesh) / np.sqrt(coeff**2 + 1))
oumask = np.where(
np.bitwise_and(distance < width / 2.0, radial_mask), False, inmask
)
# second shadow
coeff = np.tan(angle % pi + pi / 2)
distance = np.abs((coeff * x_mesh - y_mesh) / np.sqrt(coeff**2 + 1))
oumask = np.where(
np.bitwise_and(distance < width / 2.0, radial_mask), False, oumask
)
return oumask
[docs]
def are_axes_equal(axis_a, axis_b):
if axis_a.shape[0] != axis_b.shape[0]:
return False
return np.all(axis_a == axis_b)
[docs]
def create_2d_array_reconstruction_array(x_axis, y_axis, mask):
# Creating grid reconstruction
n_valid = np.sum(mask)
x_idx = np.arange(x_axis.shape[0], dtype=int)
y_idx = np.arange(y_axis.shape[0], dtype=int)
x_idx_grd, y_idx_grd = np.meshgrid(x_idx, y_idx, indexing="ij")
xy_idx_grid = np.empty([n_valid, 2], dtype=int)
xy_idx_grid[:, 0] = x_idx_grd[mask]
xy_idx_grid[:, 1] = y_idx_grd[mask]
return xy_idx_grid
[docs]
def regrid_data_onto_2d_grid(x_axis, y_axis, linear_array, grid_idx):
"""
Use index information to get 1D data back onto a 2D grid.
Args:
x_axis: X axis of the data on the 2 D grid
y_axis: Y axis of the data on the 2 D grid
linear_array: Linearized masked array
grid_idx: Linearized 2 D indexes onto the original grid
Returns:
Data regridded onto a 2D array
"""
nx = x_axis.shape[0]
ny = y_axis.shape[0]
if linear_array.ndim == 1:
grid_shape = [nx, ny]
else:
grid_shape = [nx, ny, linear_array.shape[1]]
gridded = np.full(grid_shape, np.nan)
gridded[grid_idx[:, 0], grid_idx[:, 1]] = linear_array[:]
return gridded
[docs]
def compute_antenna_baseline_distance_matrix_dict(ant_pos, ant_names):
"""
Compute a matrix of antenna position distances from antenna positions
:param ant_pos: antenna position array
:param ant_names: antenna names array
:return: dict with antenna distance matrix
"""
pos_df = pd.DataFrame(ant_pos, columns=["x", "y", "z"], index=ant_names)
dist_mat_df = pd.DataFrame(
distance_matrix(pos_df.values, pos_df.values),
index=pos_df.index,
columns=pos_df.index,
)
return dist_mat_df.to_dict(orient="index")