Source code for astrohack.core.extract_holog

import os

import numpy as np
import xarray as xr
import astropy
import toolviper.utils.logger as logger

from numba import njit
from numba.core import types

from casacoretables import tables as ctables

from astrohack.io.holog_mds import AstrohackHologFile
from astrohack.utils.tools import get_valid_state_ids
from astrohack.antenna.telescope import get_proper_telescope
from astrohack.utils.text import create_dataset_label
from astrohack.utils.imaging import calculate_parallactic_angle_chunk
from astrohack.utils.algorithms import calculate_optimal_grid_parameters
from astrohack.utils.conversion import casa_time_to_mjd
from astrohack.utils.constants import twopi, clight
from astrohack.utils.gridding import grid_1d_data
from astrohack.utils.constants import pol_str, njit_caching
from astrohack.core.holog_obs_dict import HologObsDict


[docs] def extract_holog_preprocessing(extract_holog_params, pnt_mds): holog_obs_dict = extract_holog_params["holog_obs_dict"] # Get spectral windows ctb = ctables.table( os.path.join(extract_holog_params["ms_name"], "DATA_DESCRIPTION"), readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) ddi_spw = ctb.getcol("SPECTRAL_WINDOW_ID") ddpol_indexol = ctb.getcol("POLARIZATION_ID") ctb.close() ant_names = pnt_mds.root.attrs["antenna_names"] ant_ids = pnt_mds.root.attrs["antenna_ids"] ant_stations = pnt_mds.root.attrs["antenna_stations"] # Create holog_obs_dict or modify user supplied holog_obs_dict. if holog_obs_dict is None: holog_obs_dict = HologObsDict.create_from_ms_info( pnt_mds=pnt_mds, exclude_antennas=extract_holog_params["exclude_antennas"], baseline_average_distance=extract_holog_params["baseline_average_distance"], baseline_average_nearest=extract_holog_params["baseline_average_nearest"], ) user_ddi_sel = extract_holog_params["ddi"] if isinstance(user_ddi_sel, int): user_ddi_sel = [user_ddi_sel] if user_ddi_sel != "all": holog_obs_dict.select("ddi", user_ddi_sel) user_ant_sel = extract_holog_params["ant"] if isinstance(user_ant_sel, int): user_ant_sel = [user_ant_sel] if user_ant_sel != "all": holog_obs_dict.select("antenna", user_ant_sel) ctb = ctables.table( os.path.join(extract_holog_params["ms_name"], "STATE"), readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) # Scan intent (with subscan intent) is stored in the OBS_MODE column of the STATE sub-table. obs_modes = ctb.getcol("OBS_MODE") ctb.close() state_ids = get_valid_state_ids(obs_modes) spw_ctb = ctables.table( os.path.join(extract_holog_params["ms_name"], "SPECTRAL_WINDOW"), readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) pol_ctb = ctables.table( os.path.join(extract_holog_params["ms_name"], "POLARIZATION"), readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) telescope_name = pnt_mds.root.attrs["telescope_name"] # start_time_unix = obs_ctb.getcol('TIME_RANGE')[0][0] - 3506716800.0 # time = Time(start_time_unix, format='unix').jyear # If we have an EVLA run from before 2023 the pointing table needs to be fixed. if telescope_name == "EVLA": # and time < 2023: n_mapping = 0 for ddi_key, ddi_dict in holog_obs_dict.items(): n_map_ddi = 0 for map_dict in ddi_dict.values(): n_map_ddi += len(map_dict["ant"]) if n_map_ddi == 0: logger.warning(f"DDI {ddi_key} has 0 mapping antennas") n_mapping += n_map_ddi if n_mapping == 0: msg = "No mapping antennas to process, maybe you need to fix the pointing table?" logger.error(msg) raise RuntimeError(msg) looping_dict = {} for ddi_key in holog_obs_dict.keys(): ddi = int(ddi_key.replace("ddi_", "")) spw_setup_id = ddi_spw[ddi] pol_setup_id = ddpol_indexol[ddi] chan_freq = spw_ctb.getcol("CHAN_FREQ", startrow=spw_setup_id, nrow=1)[0, :] chan_width = spw_ctb.getcol("CHAN_WIDTH", startrow=spw_setup_id, nrow=1)[0, :] eff_bw = spw_ctb.getcol("EFFECTIVE_BW", startrow=spw_setup_id, nrow=1)[0, :] ref_freq = spw_ctb.getcol("REF_FREQUENCY", startrow=spw_setup_id, nrow=1)[0] total_bw = spw_ctb.getcol("TOTAL_BANDWIDTH", startrow=spw_setup_id, nrow=1)[0] chan_setup = { "chan_freq": chan_freq, "chan_width": chan_width, "eff_bw": eff_bw, "ref_freq": ref_freq, "total_bw": total_bw, } pol_setup = { "pol": pol_str[ pol_ctb.getcol("CORR_TYPE", startrow=pol_setup_id, nrow=1)[0, :] ] } ddi_dict = {} for map_key, map_data in holog_obs_dict[ddi_key].items(): scans = map_data["scans"] if len(scans) > 1: logger.info( "Pre-processing ddi: {ddi}, scans: [{min} ... {max}]".format( ddi=ddi, min=scans[0], max=scans[-1] ) ) else: logger.info( "Pre-processing ddi: {ddi}, scan: {scan}".format( ddi=ddi, scan=scans ) ) if len(list(map_data["ant"].keys())) != 0: map_ant_list = [] ref_ant_per_map_ant_list = [] map_ant_name_list = [] ref_ant_per_map_ant_name_list = [] ref_ant_stations_list = [] for map_ant_name, ref_ant_name_list in map_data["ant"].items(): ref_ant_ids = _convert_ant_name_to_id( ref_ant_name_list, ant_names, ant_ids, ) map_ant_id = _convert_ant_name_to_id( map_ant_name, ant_names, ant_ids )[0] ref_ant_per_map_ant_list.append(ref_ant_ids) map_ant_list.append(map_ant_id) ref_ant_per_map_ant_name_list.append( list(map_data["ant"][map_ant_name]) ) ref_ant_stations = _convert_ant_name_to_id( ref_ant_name_list, ant_names, ant_stations ) ref_ant_stations_list.append(ref_ant_stations) map_ant_name_list.append(map_ant_name) map_dict = { "ref_ant_per_map_ant_tuple": tuple(ref_ant_per_map_ant_list), "map_ant_tuple": tuple(map_ant_list), "ref_ant_per_map_ant_name_tuple": tuple( ref_ant_per_map_ant_name_list ), "map_ant_name_tuple": tuple(map_ant_name_list), "ant_stations": ant_stations, "scans": scans, "sel_state_ids": state_ids, "ant_names": ant_names, "chan_setup": chan_setup, "pol_setup": pol_setup, } ddi_dict[map_key] = map_dict else: logger.warning( f"{create_dataset_label(ddi_key, map_key)} has no holography data to extract." ) looping_dict[ddi_key] = ddi_dict spw_ctb.close() pol_ctb.close() return looping_dict, holog_obs_dict
[docs] def process_extract_holog_chunk( extract_holog_params: dict, holog_mds: AstrohackHologFile ): """Perform data query on holography data chunk and get unique time and state_ids/ Args: extract_holog_params (dict): parameters controlling work to be done on this chunk holog_mds (AstrohackHologFile): Output holog mds file """ ms_name = extract_holog_params["ms_name"] data_column = extract_holog_params["data_column"] time_interval = extract_holog_params["time_smoothing_interval"] pointing_interpolation_method = extract_holog_params[ "pointing_interpolation_method" ] holog_name = extract_holog_params["holog_name"] pnt_mds = extract_holog_params["pnt_mds"] inp_data_dict = extract_holog_params["dic_data"] ddi_key = extract_holog_params["this_ddi"] map_key = extract_holog_params["this_map"] ddi_id = int(ddi_key.replace("ddi_", "")) scans = inp_data_dict["scans"] ant_names = inp_data_dict["ant_names"] ant_stations = inp_data_dict["ant_stations"] ref_ant_per_map_ant_tuple = inp_data_dict["ref_ant_per_map_ant_tuple"] map_ant_tuple = inp_data_dict["map_ant_tuple"] map_ant_name_tuple = inp_data_dict["map_ant_name_tuple"] sel_state_ids = inp_data_dict["sel_state_ids"] chan_setup = inp_data_dict["chan_setup"] pol = inp_data_dict["pol_setup"]["pol"] chan_freq = chan_setup["chan_freq"] # This piece of information is no longer used leaving them here commented out for completeness # ref_ant_per_map_ant_name_tuple = extract_holog_params["ref_ant_per_map_ant_name_tuple"] if len(ref_ant_per_map_ant_tuple) != len(map_ant_tuple): msg = "Reference antenna per mapping antenna list and mapping antenna list should have same length." logger.error(msg) raise RuntimeError(msg) table_obj = ctables.table( ms_name, readonly=True, lockoptions={"option": "usernoread"}, ack=False ) scans = [int(scan) for scan in scans] if sel_state_ids: ctb = ctables.taql( "select %s, SCAN_NUMBER, ANTENNA1, ANTENNA2, TIME, TIME_CENTROID, WEIGHT, FLAG_ROW, FLAG, FIELD_ID from " "$table_obj WHERE DATA_DESC_ID == %s AND SCAN_NUMBER in %s AND STATE_ID in %s" % (data_column, ddi_id, scans, list(sel_state_ids)) ) else: ctb = ctables.taql( "select %s, SCAN_NUMBER, ANTENNA1, ANTENNA2, TIME, TIME_CENTROID, WEIGHT, FLAG_ROW, FLAG, FIELD_ID from " "$table_obj WHERE DATA_DESC_ID == %s AND SCAN_NUMBER in %s" % (data_column, ddi_id, scans) ) vis_data = ctb.getcol(data_column) weight = ctb.getcol("WEIGHT") ant1 = ctb.getcol("ANTENNA1") ant2 = ctb.getcol("ANTENNA2") time_vis_row = ctb.getcol("TIME") # Centroid is never used, hence it is commented out to improve efficiency # time_vis_row_centroid = ctb.getcol("TIME_CENTROID") flag = ctb.getcol("FLAG") flag_row = ctb.getcol("FLAG_ROW") scan_list = ctb.getcol("SCAN_NUMBER") field_ids = ctb.getcol("FIELD_ID") gen_info = _get_general_summary(ms_name, field_ids) # Here we use the median of the differences between dumps as this is a good proxy for the integration time if time_interval is None: time_interval = np.median(np.diff(np.unique(time_vis_row))) ctb.close() table_obj.close() map_ref_dict = _get_map_ref_dict( map_ant_tuple, ref_ant_per_map_ant_tuple, ant_names, ant_stations ) ( time_vis, vis_map_dict, weight_map_dict, flagged_mapping_antennas, used_samples_dict, scan_time_ranges, unq_scans, ) = _extract_holog_chunk_jit( vis_data, weight, ant1, ant2, time_vis_row, flag, flag_row, ref_ant_per_map_ant_tuple, map_ant_tuple, time_interval, scan_list, ) del vis_data, weight, ant1, ant2, time_vis_row, flag, flag_row, field_ids, scan_list map_ant_name_list = list(map(str, map_ant_name_tuple)) map_ant_name_list = ["_".join(("ant", i)) for i in map_ant_name_list] pnt_map_dict = _extract_pointing_chunk( map_ant_name_list, time_vis, pnt_mds, pointing_interpolation_method ) grid_params = {} # The loop has been moved out of the function here making the gridding parameter auto-calculation # function more general use (hopefully). I honestly couldn't see a reason to keep it inside. for ant_index in vis_map_dict.keys(): antenna_name = "_".join(("ant", ant_names[ant_index])) telescope = get_proper_telescope(gen_info["telescope name"], antenna_name) n_pix, cell_size = calculate_optimal_grid_parameters( pnt_map_dict, antenna_name, telescope.diameter, chan_freq, ddi_id ) grid_params[antenna_name] = {"n_pix": n_pix, "cell_size": cell_size} # ## To DO: ################## Average multiple repeated samples over_flow_protector_constant = float("%.5g" % # time_vis[0]) # For example 5076846059.4 -> 5076800000.0 time_vis = time_vis - over_flow_protector_constant # from astrohack.utils._algorithms import _average_repeated_pointings time_vis = _average_repeated_pointings( # vis_map_dict, weight_map_dict, flagged_mapping_antennas,time_vis,pnt_map_dict,ant_names) time_vis = time_vis + # over_flow_protector_constant _create_holog_file( holog_name, vis_map_dict, weight_map_dict, pnt_map_dict, time_vis, used_samples_dict, chan_freq, pol, flagged_mapping_antennas, map_key, ddi_key, ms_name, ant_names, grid_params, time_interval, gen_info, map_ref_dict, scan_time_ranges, unq_scans, holog_mds, ) logger.info(f"Finished extracting holography chunk for DDI {ddi_id}, {map_key}.")
def _get_map_ref_dict( map_ant_tuple, ref_ant_per_map_ant_tuple, ant_names, ant_stations ): map_dict = {} for ii, map_id in enumerate(map_ant_tuple): map_name = ant_names[map_id] ref_list = [] for ref_id in ref_ant_per_map_ant_tuple[ii]: ref_list.append(f"{ant_names[ref_id]} @ {ant_stations[ref_id]}") map_dict[map_name] = ref_list return map_dict @njit(cache=njit_caching, nogil=True) def _get_time_intervals(time_vis_row, scan_list, time_interval): unq_scans = np.unique(scan_list) scan_time_ranges = [] for scan in unq_scans: selected_times = time_vis_row[scan_list == scan] min_time, max_time = np.min(selected_times), np.max(selected_times) scan_time_ranges.append([min_time, max_time]) half_int = time_interval / 2 start = np.min(time_vis_row) + half_int total_time = np.max(time_vis_row) - start n_time = int(np.ceil(total_time / time_interval)) + 1 stop = start + n_time * time_interval raw_time_samples = np.linspace(start, stop, n_time + 1) filtered_time_samples = [] for time_sample in raw_time_samples: for time_range in scan_time_ranges: if time_range[0] <= time_sample <= time_range[1]: filtered_time_samples.append(time_sample) break time_samples = np.array(filtered_time_samples) return time_samples, scan_time_ranges, unq_scans @njit(cache=njit_caching, nogil=True) def _extract_holog_chunk_jit( vis_data, weight, ant1, ant2, time_vis_row, flag, flag_row, ref_ant_per_map_ant_tuple, map_ant_tuple, time_interval, scan_list, ): """JIT compiled function to extract relevant visibilty data from chunk after flagging and applying weights. Args: vis_data (numpy.ndarray): Visibility data (row, channel, polarization) weight (numpy.ndarray): Data weight values (row, polarization) ant1 (numpy.ndarray): List of antenna_ids for antenna1 ant2 (numpy.ndarray): List of antenna_ids for antenna2 time_vis_row (numpy.ndarray): Array of full time talues by row flag (numpy.ndarray): Array of data quality flags to apply to data flag_row (numpy.ndarray): Array indicating when a full row of data should be flagged ref_ant_per_map_ant_tuple(tuple): reference antenna per mapping antenna map_ant_tuple(tuple): mapping antennas? time_interval(float): time smoothing interval scan_list(list): list of valid holography scans Returns: dict: Antenna_id referenced (key) dictionary containing the visibility data selected by (time, channel, polarization) """ time_samples, scan_time_ranges, unq_scans = _get_time_intervals( time_vis_row, scan_list, time_interval ) n_time = len(time_samples) n_row, n_chan, n_pol = vis_data.shape half_int = time_interval / 2 vis_map_dict = {} sum_weight_map_dict = {} used_samples_dict = {} for antenna_id in map_ant_tuple: vis_map_dict[antenna_id] = np.zeros( (n_time, n_chan, n_pol), dtype=types.complex128, ) sum_weight_map_dict[antenna_id] = np.zeros( (n_time, n_chan, n_pol), dtype=types.float64, ) # This code here is to be uncommented and the snippet above commited for this function to work outside jit # vis_map_dict[antenna_id] = np.zeros( # (n_time, n_chan, n_pol), # dtype=np.complex128, # ) # sum_weight_map_dict[antenna_id] = np.zeros( # (n_time, n_chan, n_pol), # dtype=np.float64, # ) used_samples_dict[antenna_id] = np.full(n_time, False, dtype=bool) time_index = 0 for row in range(n_row): if flag_row is False: continue # Find index of time_vis_row[row] in time_samples, assumes time_vis_row is ordered in time if time_vis_row[row] < time_samples[time_index] - half_int: continue else: time_index = _get_time_index( time_vis_row[row], time_index, time_samples, half_int ) if time_index < 0: break ant1_id = ant1[row] ant2_id = ant2[row] if ant1_id in map_ant_tuple: indx = map_ant_tuple.index(ant1_id) conjugate = False ref_ant_id = ant2_id map_ant_id = ant1_id # mapping antenna index elif ant2_id in map_ant_tuple: indx = map_ant_tuple.index(ant2_id) conjugate = True ref_ant_id = ant1_id map_ant_id = ant2_id # mapping antenna index else: continue if ref_ant_id in ref_ant_per_map_ant_tuple[indx]: if conjugate: vis_baseline = np.conjugate(vis_data[row, :, :]) else: vis_baseline = vis_data[row, :, :] # n_chan x n_pol else: continue for chan in range(n_chan): for pol in range(n_pol): if ~(flag[row, chan, pol]): # Calculate running weighted sum of visibilities used_samples_dict[map_ant_id][time_index] = True vis_map_dict[map_ant_id][time_index, chan, pol] = ( vis_map_dict[map_ant_id][time_index, chan, pol] + vis_baseline[chan, pol] * weight[row, pol] ) # Calculate running sum of weights sum_weight_map_dict[map_ant_id][time_index, chan, pol] = ( sum_weight_map_dict[map_ant_id][time_index, chan, pol] + weight[row, pol] ) flagged_mapping_antennas = [] for map_ant_id in vis_map_dict.keys(): sum_of_sum_weight = 0 for time_index in range(n_time): for chan in range(n_chan): for pol in range(n_pol): sum_weight = sum_weight_map_dict[map_ant_id][time_index, chan, pol] sum_of_sum_weight = sum_of_sum_weight + sum_weight if sum_weight == 0: vis_map_dict[map_ant_id][time_index, chan, pol] = 0.0 else: vis_map_dict[map_ant_id][time_index, chan, pol] = ( vis_map_dict[map_ant_id][time_index, chan, pol] / sum_weight ) if sum_of_sum_weight == 0: flagged_mapping_antennas.append(map_ant_id) return ( time_samples, vis_map_dict, sum_weight_map_dict, flagged_mapping_antennas, used_samples_dict, scan_time_ranges, unq_scans, ) def _get_time_samples(time_vis): """Sample three values for time vis and cooresponding indices. Values are sammpled as (first, middle, last) Args: time_vis (numpy.ndarray): a list of visibility times Returns: numpy.ndarray, list: a select subset of visibility times (first, middle, last) """ n_time_vis = time_vis.shape[0] middle = int(n_time_vis // 2) indices = [0, middle, n_time_vis - 1] return np.take(time_vis, indices), indices def _create_holog_file( holog_name, vis_map_dict, weight_map_dict, pnt_map_dict, time_vis, used_samples_dict, chan, pol, flagged_mapping_antennas, map_key, ddi_key, ms_name, ant_names, grid_params, time_interval, gen_info, map_ref_dict, scan_time_ranges, unq_scans, holog_mds: AstrohackHologFile, ): """Create holog-structured, formatted output file and save to zarr. Args: holog_name (str): holog file name. vis_map_dict (dict): a nested dictionary/map of weighted visibilities indexed as [antenna][time, chan, pol]; \ mainains time ordering. weight_map_dict (dict): weights dictionary/map for visibilites in vis_map_dict pnt_map_dict (dict): pointing table map dictionary time_vis (numpy.ndarray): time_vis values chan (numpy.ndarray): channel values pol (numpy.ndarray): polarization values flagged_mapping_antennas (numpy.ndarray): list of mapping antennas that have been flagged. map_key(string): holog map id string ddi_key (string): data description id; a combination of polarization and spectral window """ ctb = ctables.table("/".join((ms_name, "ANTENNA")), ack=False) observing_location = ctb.getcol("POSITION") ctb.close() for map_ant_index in vis_map_dict.keys(): dataset_label = create_dataset_label( ant_names[map_ant_index], ddi_key.split("_")[1] ) if map_ant_index not in flagged_mapping_antennas: map_ant_key = f"ant_{ant_names[map_ant_index]}" pnt_xds = pnt_map_dict[map_ant_key] vis_data = vis_map_dict[map_ant_index] wei_data = weight_map_dict[map_ant_index] valid_data = used_samples_dict[map_ant_index] == 1.0 ant_time_vis = time_vis[valid_data] time_vis_days = ant_time_vis / (3600 * 24) astro_time_vis = astropy.time.Time(time_vis_days, format="mjd") time_samples, indicies = _get_time_samples(astro_time_vis) coords = {"time": ant_time_vis, "chan": chan, "pol": pol} direction = np.take( pnt_xds["DIRECTIONAL_COSINES"].values, indicies, axis=0, ) parallactic_samples = calculate_parallactic_angle_chunk( time_samples=time_samples, observing_location=observing_location[map_ant_index], direction=direction, ) xds = xr.Dataset() xds = xds.assign_coords(coords) xds["VIS"] = xr.DataArray( vis_data[valid_data, ...], dims=["time", "chan", "pol"], ) xds["WEIGHT"] = xr.DataArray( wei_data[valid_data, ...], dims=["time", "chan", "pol"], ) xds["DIRECTIONAL_COSINES"] = xr.DataArray( pnt_xds["DIRECTIONAL_COSINES"].values[valid_data, ...], dims=["time", "lm"], ) xds["IDEAL_DIRECTIONAL_COSINES"] = xr.DataArray( pnt_xds["POINTING_OFFSET"].values[valid_data, ...], dims=["time", "lm"], ) xds.attrs["parallactic_samples"] = parallactic_samples xds.attrs["time_smoothing_interval"] = time_interval xds.attrs["scan_time_ranges"] = scan_time_ranges xds.attrs["scan_list"] = unq_scans xds.attrs["summary"] = _create_observation_summary( gen_info, grid_params, xds["DIRECTIONAL_COSINES"].values, chan, pnt_xds, valid_data, map_ref_dict, ) holog_filename = holog_name logger.debug(f"Writing {dataset_label} holog data to {holog_filename}") holog_mds.add_node(xds, [map_ant_key, ddi_key, map_key]) else: logger.warning(f"No holography data for {dataset_label}") def _extract_pointing_chunk( map_ant_ids, time_vis, pnt_ant_dict, pointing_interpolation_method ): """Averages pointing within the time sampling of the visibilities Args: map_ant_ids (list): list of antenna ids time_vis (numpy.ndarray): sorted, unique list of visibility times pnt_ant_dict (dict): map of pointing directional cosines with a map key based on the antenna id and indexed by the MAIN table visibility time. Returns: dict: Dictionary of directional cosine data mapped to nearest MAIN table sample times. """ keys = ["DIRECTION", "DIRECTIONAL_COSINES", "ENCODER", "POINTING_OFFSET", "TARGET"] pnt_map_dict = {} coords = {"time": time_vis} for antenna in map_ant_ids: pnt_xds = pnt_ant_dict[antenna] y_data = [] for key in keys: y_data.append(pnt_xds[key].values) pnt_time = pnt_xds.time.values resample_pnt = grid_1d_data( time_vis, pnt_time, y_data, pointing_interpolation_method, f'{antenna.split("_")[1]} pointing data', "visibility times", ) new_pnt_xds = xr.Dataset() new_pnt_xds.assign_coords(coords) for i_key, key in enumerate(keys): new_pnt_xds[key] = xr.DataArray(resample_pnt[i_key], dims=("time", "az_el")) new_pnt_xds.attrs = pnt_xds.attrs pnt_map_dict[antenna] = new_pnt_xds return pnt_map_dict @njit(cache=njit_caching, nogil=True) def _get_time_index(data_time, i_time, time_axis, half_int): if i_time == time_axis.shape[0]: return -1 while data_time > time_axis[i_time] + half_int: i_time += 1 if i_time == time_axis.shape[0]: return -1 return i_time def _get_general_summary(ms_name, field_ids): unq_ids = np.unique(field_ids) field_tbl = ctables.table( ms_name + "::FIELD", readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) i_src = int(unq_ids[0]) src_name = field_tbl.getcol("NAME") phase_center_fk5 = field_tbl.getcol("PHASE_DIR")[:, 0, :] field_tbl.close() obs_table = ctables.table( ms_name + "::OBSERVATION", readonly=True, lockoptions={"option": "usernoread"}, ack=False, ) time_range = casa_time_to_mjd(obs_table.getcol("TIME_RANGE")[0]) telescope_name = obs_table.getcol("TELESCOPE_NAME")[0] obs_table.close() phase_center_fk5[:, 0] = np.where( phase_center_fk5[:, 0] < 0, phase_center_fk5[:, 0] + twopi, phase_center_fk5[:, 0], ) gen_info = { "source": src_name[i_src], "phase center": phase_center_fk5[i_src].tolist(), "telescope name": telescope_name, "start time": time_range[0], # start time is in MJD in days "stop time": time_range[-1], # stop time is in MJD in days "duration": (time_range[-1] - time_range[0]) * 86400, # Store it in seconds rather than days } return gen_info def _get_az_el_characteristics(pnt_map_xds, valid_data): az_el = pnt_map_xds["ENCODER"].values[valid_data, ...] lm = pnt_map_xds["DIRECTIONAL_COSINES"].values[valid_data, ...] mean_az_el = np.mean(az_el, axis=0) median_az_el = np.median(az_el, axis=0) lmmid = lm.shape[0] // 2 lmquart = lmmid // 2 ilow = lmmid - lmquart iupper = lmmid + lmquart + 1 ic = np.argmin((lm[ilow:iupper, 0] ** 2 + lm[ilow:iupper, 1]) ** 2) + ilow center_az_el = az_el[ic] az_el_info = { "center": center_az_el.tolist(), "mean": mean_az_el.tolist(), "median": median_az_el.tolist(), } return az_el_info def _get_freq_summary(chan_axis): chan_width = np.abs(chan_axis[1] - chan_axis[0]) rep_freq = chan_axis[chan_axis.shape[0] // 2] freq_info = { "channel width": chan_width, "number of channels": chan_axis.shape[0], "frequency range": [ chan_axis[0] - chan_width / 2, chan_axis[-1] + chan_width / 2, ], "rep. frequency": rep_freq, "rep. wavelength": clight / rep_freq, } return freq_info def _create_observation_summary( obs_info, grid_params, lm, chan_axis, pnt_map_xds, valid_data, map_ref_dict, ): antenna_info = pnt_map_xds.attrs["antenna_info"] antenna_name = antenna_info["name"] station = antenna_info["station"] spw_info = _get_freq_summary(chan_axis) obs_info["az el info"] = _get_az_el_characteristics(pnt_map_xds, valid_data) obs_info["reference antennas"] = map_ref_dict[antenna_name] obs_info["antenna name"] = antenna_name obs_info["station"] = station l_max = np.max(lm[:, 0]) l_min = np.min(lm[:, 0]) m_max = np.max(lm[:, 1]) m_min = np.min(lm[:, 1]) beam_info = { "grid size": grid_params[f"ant_{antenna_name}"]["n_pix"], "cell size": grid_params[f"ant_{antenna_name}"]["cell_size"], "l extent": [l_min, l_max], "m extent": [m_min, m_max], } summary = { "spectral": spw_info, "beam": beam_info, "general": obs_info, "aperture": None, } return summary def _convert_ant_name_to_id( ant_name_list, ref_ant_names, ref_ant_ids, ): """_summary_ Args: ant_name_list (list): List of antennas for which to fetch ids ref_ant_names (list): Reference list of antenna names ref_ant_ids (list): Reference list of antenna ids Returns: list: List of antenna ids """ if not isinstance(ant_name_list, list): ant_name_list = [ant_name_list] ant_ids_list = [] for ant_name in ant_name_list: if ant_name in ref_ant_names: i_ids = ref_ant_names.index(ant_name) ant_ids_list.append(ref_ant_ids[i_ids]) return ant_ids_list