import json
import numpy as np
import pandas as pd
import toolviper.utils.logger as logger
from typing import Union, List, Any
from rich.console import Console
from astrohack.io.dio import open_holog
def _add_prefix_to_keys(prefix, key_list):
add_prefix_lambda = lambda list_item: f"{prefix}_{list_item}"
return list(map(add_prefix_lambda, key_list))
def _check_if_array_in_dict(array_dict, array):
for key, val in array_dict.items():
if np.array_equiv(val, array):
return key
return False
[docs]
class HologObsDict(dict):
def __init__(self, dict_obj: dict = None):
if dict_obj is None:
super().__init__()
else:
super().__init__(dict_obj)
def __getitem__(self, key: str):
return super().__getitem__(key)
def __setitem__(self, key: str, value: Any):
return super().__setitem__(key, value)
[docs]
def print(self, style: str = "static"):
if style == "dynamic":
from IPython.display import JSON
return JSON(self)
else:
console = Console()
console.log(self, log_locals=False)
return None
@classmethod
[docs]
def from_json_file(cls, filepath):
with open(filepath, "r") as file:
json_dict = json.load(file)
return cls(json_dict)
[docs]
def to_json_file(self, filepath):
with open(filepath, "w") as file:
json.dump(self, file, indent=4)
@classmethod
[docs]
def from_holog_file(cls, filepath):
holog_mds = open_holog(filepath)
return cls(holog_mds.root.attrs["holog_obs_dict"])
@classmethod
[docs]
def create_from_ms_info(
cls,
pnt_mds,
exclude_antennas,
baseline_average_distance,
baseline_average_nearest,
):
ant_names_main = pnt_mds.root.attrs["antenna_names"]
dist_matrix_dict = pnt_mds.root.attrs["baseline_dist_matrix"]
mapping_scans_dict = {}
holog_obs_dict = cls()
map_id = 0
ant_names_set = set()
if exclude_antennas is None:
exclude_antennas = []
elif isinstance(exclude_antennas, str):
exclude_antennas = [exclude_antennas]
else:
pass
for ant_name in exclude_antennas:
prefixed = "ant_" + ant_name
if prefixed not in pnt_mds.keys():
logger.warning(
f"Bad reference antenna {ant_name} is not present in the data."
)
# Generate {ddi: {map: {scan:[i ...], ant:{ant_map_0:[], ...}}}} structure. No reference antennas are added
# because we first need to populate all mapping antennas.
for ant_name, ant_ds in pnt_mds.items():
if "ant" in ant_name:
ant_name = ant_name.replace("ant_", "")
if ant_name in exclude_antennas:
pass
else:
if ant_name in ant_names_main: # Check if antenna in main table.
ant_names_set.add(ant_name)
for ddi, map_dict in ant_ds.attrs["mapping_scans_obs_dict"][
0
].items():
if ddi not in holog_obs_dict:
holog_obs_dict[ddi] = {}
for ant_map_id, scan_list in map_dict.items():
if scan_list:
map_key = _check_if_array_in_dict(
mapping_scans_dict, scan_list
)
if not map_key:
map_key = "map_" + str(map_id)
mapping_scans_dict[map_key] = scan_list
map_id = map_id + 1
if map_key not in holog_obs_dict[ddi]:
holog_obs_dict[ddi][map_key] = {
"scans": scan_list,
"ant": {},
}
holog_obs_dict[ddi][map_key]["ant"][ant_name] = []
df_mat = pd.DataFrame.from_dict(dist_matrix_dict, orient="index")
if (baseline_average_distance != "all") and (baseline_average_nearest != "all"):
logger.error(
"baseline_average_distance and baseline_average_nearest can not both be specified."
)
raise RuntimeError("Too many baseline parameters specified.")
# The reference antennas are then given by ref_ant_set = ant_names_set - map_ant_set.
for ddi, ddi_dict in holog_obs_dict.items():
for map_id, map_dict in ddi_dict.items():
map_ant_set = set(map_dict["ant"].keys())
# Need a copy because of del holog_obs_dict[ddi][map_id]['ant'][map_ant_key] below.
map_ant_keys = list(map_dict["ant"].keys())
for map_ant_key in map_ant_keys:
ref_ant_set = ant_names_set - map_ant_set
# Select reference antennas by distance from mapping antenna
if baseline_average_distance != "all":
sub_ref_ant_set = []
for ref_ant in ref_ant_set:
if (
df_mat.loc[map_ant_key, ref_ant]
< baseline_average_distance
):
sub_ref_ant_set.append(ref_ant)
if (not sub_ref_ant_set) and ref_ant_set:
logger.warning(
"DDI "
+ str(ddi)
+ " and mapping antenna "
+ str(map_ant_key)
+ " has no reference antennas. If baseline_average_distance was specified "
"increase this distance. See antenna distance matrix in log by setting "
"debug level to DEBUG in client function."
)
ref_ant_set = sub_ref_ant_set
# Select reference antennas by the n-closest antennas
if baseline_average_nearest != "all":
sub_ref_ant_set = []
nearest_ant_list = (
df_mat.loc[map_ant_key, :]
.loc[list(ref_ant_set)]
.sort_values()
.index.tolist()[0:baseline_average_nearest]
)
logger.debug(nearest_ant_list)
for ref_ant in ref_ant_set:
if ref_ant in nearest_ant_list:
sub_ref_ant_set.append(ref_ant)
ref_ant_set = sub_ref_ant_set
##################################################
if ref_ant_set:
holog_obs_dict[ddi][map_id]["ant"][map_ant_key] = list(
ref_ant_set
)
else:
del holog_obs_dict[ddi][map_id]["ant"][
map_ant_key
] # Don't want mapping antennas with no reference antennas.
logger.warning(
"DDI "
+ str(ddi)
+ " and mapping antenna "
+ str(map_ant_key)
+ " has no reference antennas."
)
return holog_obs_dict
def _select_ddi(self, selected_values: Union[int, List[int]]):
prefixed_selected_values = _add_prefix_to_keys("ddi", selected_values)
ddi_list = list(self.keys())
for ddi_key in ddi_list:
if ddi_key not in prefixed_selected_values:
self.pop(ddi_key)
return
def _select_antenna(self, selected_values: Union[str, List[str]]):
for ddi_key in self.keys():
for map_key in self[ddi_key].keys():
ant_list = list(self[ddi_key][map_key]["ant"].keys())
for ant_key in ant_list:
if ant_key not in selected_values:
self[ddi_key][map_key]["ant"].pop(ant_key)
return
def _select_map(self, selected_values: Union[int, List[int]]):
prefixed_selected_values = _add_prefix_to_keys("map", selected_values)
for ddi_key in self.keys():
map_list = list(self[ddi_key].keys())
for map_key in map_list:
if map_key not in prefixed_selected_values:
self[ddi_key].pop(map_key)
def _select_scan(self, selected_values: Union[int, List[int]]):
for ddi_key in self.keys():
for map_key in self[ddi_key].keys():
self[ddi_key][map_key]["scan"] = selected_values
return
@staticmethod
[docs]
def get_nearest_baselines(
antenna: str, dist_matrix_dict, n_baselines: int = None
) -> object:
df_matrix = pd.DataFrame.from_dict(dist_matrix_dict, orient="index")
# Skip the first index because it is a self distance
if n_baselines is None:
return (
df_matrix[antenna].sort_values(ascending=True).index[1:].values.tolist()
)
return (
df_matrix[antenna]
.sort_values(ascending=True)
.index[1:n_baselines]
.values.tolist()
)
[docs]
def select_baseline(
self, selected_values, n_baselines, dist_matrix_dict, reference=None
):
if reference is not None:
if not isinstance(reference, list):
reference = [reference]
ddi_list = list(self.keys())
for ddi in ddi_list:
map_list = list(self[ddi].keys())
for mp in map_list:
ant_list = list(self[ddi][mp]["ant"].keys())
for ant in ant_list:
if ant not in selected_values:
self[ddi][mp]["ant"].pop(ant)
continue
if reference is None and n_baselines is not None:
reference_antennas = self[ddi][mp]["ant"][ant]
if n_baselines > len(reference_antennas):
n_baselines = len(reference_antennas)
sorted_antennas = np.array(
self.get_nearest_baselines(ant, dist_matrix_dict)
)
values, i, j = np.intersect1d(
reference_antennas, sorted_antennas, return_indices=True
)
index = np.sort(j)
self[ddi][mp]["ant"][ant] = sorted_antennas[index][:n_baselines]
else:
self[ddi][mp]["ant"][ant] = reference
return
[docs]
def select(self, key, selected_values):
if not isinstance(selected_values, (list, tuple)):
selected_values = [selected_values]
match key:
case "ddi":
self._select_ddi(selected_values)
case "antenna":
self._select_antenna(selected_values)
case "map":
self._select_map(selected_values)
case "scan":
self._select_scan(selected_values)
return