Source code for astrohack.antenna.telescope

from shapely.geometry.point import Point
from shapely.geometry.polygon import Polygon
from shapely.strtree import STRtree

import pathlib

import xarray as xr
import toolviper.utils.logger as logger

from astrohack.antenna.polygon_panel import PolygonPanel
from astrohack.utils.constants import *
from astrohack.antenna.ring_panel import RingPanel
from astrohack.utils.algorithms import create_coordinate_images, arm_shadow_masking
from astrohack.utils.ray_tracing_general import GlobalQPS
from astrohack.utils.package_info import get_astrohack_path


[docs] class Telescope: """ Base telescope class containing IO methods and attributes that are common to all telescopes. """
[docs] data_array_keys = ["point_cloud", "qps_coefficients"]
[docs] data_array_dims = [["point_axis", "xyz"], ["qps_coefficients"]]
[docs] excluded_keys = ["filepath", "filename", "z_cos_image"]
def __init__(self):
[docs] self.diameter = None
[docs] self.antenna_list = None
[docs] self.file_name = None
[docs] self.file_path = None
[docs] self.array_center = None
[docs] self.comment = None
[docs] self.inner_radial_limit = None
[docs] self.outer_radial_limit = None
[docs] self.name = None
[docs] self.el_axis_offset = None
[docs] self.station_distance_dict = None
[docs] self.gain_wavelengths = None
[docs] def read(self, filename): """ Read the telescope object from an X array .zarr telescope configuration file Args: filename: name of the input file """ try: logger.debug("Reading telescope data from: filename") xds = xr.open_zarr(filename) for key in xds.attrs: setattr(self, key, xds.attrs[key]) for key in self.data_array_keys: try: setattr(self, key, xds[key].values) except KeyError: pass except FileNotFoundError: logger.error(f"Telescope file not found: {filename}") raise FileNotFoundError relative_path = pathlib.Path(filename) abs_path = relative_path.resolve() self.file_name = str(abs_path.name) self.file_path = str(abs_path.parent)
[docs] def read_from_distro(self, name): """ Read telescope info from files distributed with astrohack. Args: name: Name of the telescope to be read. Returns: None """ dest_path = get_astrohack_path() / f"data/telescopes/{name.lower()}.zarr" self.read(dest_path)
[docs] def write(self, filename): """ Write the telescope object to an X array .zarr telescope configuration file Args: filename: Name of the output file """ obj_dict = vars(self) for key in self.excluded_keys: obj_dict.pop(key, None) xds = xr.Dataset() for ikey, key in enumerate(self.data_array_keys): try: data = obj_dict.pop(key) xds[key] = xr.DataArray(data, dims=self.data_array_dims[ikey]) except KeyError: pass xds.attrs = obj_dict logger.debug("Writing telescope data to: filename") xds.to_zarr(filename, mode="w", compute=True, consolidated=True) return
[docs] def write_to_distro(self): dest_path = ( get_astrohack_path() / f'data/telescopes/{self.name.lower().replace(" ", "_")}.zarr' ) self.write(dest_path)
def __repr__(self): """ Simple print function that prints all class attributes. Returns: String containing descriptions of all object attributes. """ outstr = "" obj_dict = vars(self) for key, item in obj_dict.items(): if isinstance(item, dict): key_list = list(item.keys()) outstr += f"{key:20s} = dict({str(key_list)})\n" else: outstr += f"{key:20s} = {str(item)}\n" return outstr
[docs] class RingedCassegrain(Telescope): """ Derived class containing description and methods pertaining to telescope whose panels are distributed in concentric rings from the dish center. """ def __init__(self): super().__init__()
[docs] self.panel_inner_radii = None
[docs] self.panel_outer_radii = None
[docs] self.focus = None
[docs] self.arm_shadow_rotation = None
[docs] self.arm_shadow_width = None
[docs] self.magnification = None
[docs] self.n_panel_per_ring = None
[docs] self.panel_numbering = None
[docs] self.screw_description = None
[docs] self.screw_offset = None
[docs] self.secondary_distance_to_focus = None
[docs] self.secondary_support_shape = None
[docs] self.surp_slope = None
[docs] self.n_rings_of_panels = None
self._panel_label = None @classmethod
[docs] def from_name(cls, name): """ Initialize and read from the distro a telescope object. Args: name: Name of the telescope to be read. Returns: RingedCassegrain object """ obj = cls() obj.read_from_distro(name) return obj
[docs] def consistency_check(self): """ Make a simple check to test that some of its attributes are Returns: None """ error = False if self.panel_outer_radii[-1] > self.diameter / 2.0: logger.error("Panel description goes beyond dish outermost radius") error = True if ( not self.n_rings_of_panels == len(self.panel_inner_radii) == len(self.panel_outer_radii) ): logger.error( "Number of panels don't match radii or number of panels list sizes" ) error = True if error: raise RuntimeError("Failed Consistency check") else: print("Consistency passed") return
@staticmethod def _vla_panel_labeling(iring, ipanel): """ Provide the correct panel label for VLA style panels Args: iring: Number of the ring the panel is in ipanel: Number of the panel in that ring clockwise from the top Returns: The proper label for the panel at iring, ipanel """ return "{0:d}-{1:d}".format(iring + 1, ipanel + 1) def _alma_panel_labeling(self, iring, ipanel): """ Provide the correct panel label for ALMA style panels, which is more complicated than VLA panels due to the implementation of panel sectors Args: iring: Number of the ring the panel is in ipanel: Number of the panel in that ring clockwise from the top Returns: The proper label for the panel at iring, ipanel """ angle = twopi / self.n_panel_per_ring[iring] sector_angle = twopi / self.n_panel_per_ring[0] theta = twopi - (ipanel + 0.5) * angle sector = int( ((theta / sector_angle) + 1 + self.n_panel_per_ring[0] / 4) % self.n_panel_per_ring[0] ) if sector == 0: sector = self.n_panel_per_ring[0] nppersec = self.n_panel_per_ring[iring] / self.n_panel_per_ring[0] jpanel = int(nppersec - (ipanel % nppersec)) return "{0:1d}-{1:1d}{2:1d}".format(sector, iring + 1, jpanel)
[docs] def build_panel_list(self, panel_model, panel_margins): """ Construct a list of panel objects according to the telescope description Args: panel_model: Type of panel model to be fitted. panel_margins: how much of the panel Returns: List containing RingPanel objects """ if self.name in ["VLA", "VLBA"]: self._panel_label = self._vla_panel_labeling elif "ALMA" in self.name or self.name == "ACA 7m": self._panel_label = self._alma_panel_labeling else: raise NotImplementedError( f"Don't know how to build panel list for {self.name}" ) panel_list = [] for iring in range(self.n_rings_of_panels): angle = twopi / self.n_panel_per_ring[iring] for ipanel in range(self.n_panel_per_ring[iring]): panel = RingPanel( panel_model, angle, ipanel, self._panel_label(iring, ipanel), self.panel_inner_radii[iring], self.panel_outer_radii[iring], margin=panel_margins, screw_scheme=self.screw_description, screw_offset=self.screw_offset, plot_screw_size=0.006 * self.diameter, ) panel_list.append(panel) return panel_list
[docs] def attribute_pixels_to_panels( self, panel_list, u_axis, v_axis, radius, phi, deviation, mask ): """ Attribute pixels in deviation image to the panels in the panel_list Args: panel_list: The panel list must have been created by build_panel_list for the same instrument u_axis: Aperture U axis v_axis: Aperture V axis radius: Aperture radius map phi: Aperture phi angle map deviation: Aperture deviation mask: Aperture mask Returns: map of panel attributions """ panel_map = np.full_like(radius, -1) panelsum = 0 for iring in range(self.n_rings_of_panels): angle = twopi / self.n_panel_per_ring[iring] panel_map = np.where( radius >= self.panel_inner_radii[iring], np.floor(phi / angle) + panelsum, panel_map, ) panelsum += self.n_panel_per_ring[iring] panel_map = np.where(radius >= self.panel_inner_radii[0], panel_map, np.nan) panel_map = np.where(radius > self.panel_outer_radii[-1], np.nan, panel_map) u_mesh, v_mesh = create_coordinate_images(u_axis, v_axis) for ix, xc in enumerate(u_axis): for iy, yc in enumerate(v_axis): ipanel = panel_map[ix, iy] if ipanel >= 0: xc = u_mesh[ix, iy] yc = v_mesh[ix, iy] panel = panel_list[int(ipanel)] issample, inpanel = panel.is_inside(radius[ix, iy], phi[ix, iy]) if inpanel: if issample and mask[ix, iy]: panel.add_sample([xc, yc, ix, iy, deviation[ix, iy]]) else: panel.add_margin([xc, yc, ix, iy, deviation[ix, iy]]) return panel_map
[docs] def create_aperture_mask( self, u_axis, v_axis, use_detailed_mask=True, return_polar_meshes=False, use_outer_limit=False, ): u_mesh, v_mesh, radius_mesh, polar_angle_mesh = create_coordinate_images( u_axis, v_axis, create_polar_coordinates=True ) if use_outer_limit: outer_radius = self.outer_radial_limit else: outer_radius = self.diameter / 2.0 mask = np.full_like(radius_mesh, True, dtype=bool) mask = np.where(radius_mesh > outer_radius, False, mask) mask = np.where(radius_mesh < self.inner_radial_limit, False, mask) if self.arm_shadow_width is None or not use_detailed_mask: pass elif isinstance(self.arm_shadow_width, (float, int)): mask = arm_shadow_masking( mask, u_mesh, v_mesh, radius_mesh, self.inner_radial_limit, outer_radius, self.arm_shadow_width, self.arm_shadow_rotation, ) elif isinstance(self.arm_shadow_width, list): for section in self.arm_shadow_width: minradius, maxradius, width = section mask = arm_shadow_masking( mask, u_mesh, v_mesh, radius_mesh, minradius, maxradius, width, self.arm_shadow_rotation, ) else: raise TypeError( f"Don't know how to handle an arm width of class {type(self.arm_shadow_width)}" ) if return_polar_meshes: return mask, radius_mesh, polar_angle_mesh else: return mask
[docs] def phase_to_deviation(self, u_axis, v_axis, _, phase, wavelength): """ Transform phase image to physical deviation image based on wavelength. Args: u_axis: Aperture U axis v_axis: Aperture V axis _: dummy argument for interface compatibility phase: Phase image in Radians wavelength: Observation wavelength in meters Returns: Deviation image. """ _, _, radius, _ = create_coordinate_images( u_axis, v_axis, create_polar_coordinates=True ) acoeff = (wavelength / twopi) / (4.0 * self.focus) bcoeff = 4 * self.focus**2 return acoeff * phase * np.sqrt(radius**2 + bcoeff)
[docs] def deviation_to_phase(self, u_axis, v_axis, _, deviation, wavelength): """ " Transform deviation image to physical phase image based on wavelength. Args: u_axis: Aperture U axis v_axis: Aperture V axis _: dummy argument for interface compatibility deviation: Deviation image in meters wavelength: Observation wavelength in meters Returns: Phase image. """ _, _, radius, _ = create_coordinate_images( u_axis, v_axis, create_polar_coordinates=True ) acoeff = (wavelength / twopi) / (4.0 * self.focus) bcoeff = 4 * self.focus**2 return deviation / (acoeff * np.sqrt(radius**2 + bcoeff))
[docs] class NgvlaPrototype(Telescope): """ Derived class to contain ngVLA prototype specific methods and attributes. """ def __init__(self): super().__init__()
[docs] self.panel_dict = None
[docs] self.screw_description = None
[docs] self.point_cloud = None
[docs] self.qps_coefficients = None
[docs] self.aperture_polygon = None
# This is not to be written to disk
[docs] self.z_cos_image = None
@classmethod
[docs] def from_name(cls, name): obj = cls() obj.read_from_distro(name) return obj
[docs] def build_panel_list(self, panel_model, panel_margins): panel_list = [] for panel_label, panel_info in self.panel_dict.items(): panel = PolygonPanel(panel_label, panel_model, panel_info, panel_margins) panel_list.append(panel) return panel_list
@staticmethod
[docs] def attribute_pixels_to_panels( panel_list, u_axis, v_axis, radius, _, deviation, mask ): u_mesh, v_mesh = create_coordinate_images(u_axis, v_axis) panel_map = np.full_like(radius, np.nan) for ix, xc in enumerate(u_axis): for iy, yc in enumerate(v_axis): if mask[ix, iy]: xc = u_mesh[ix, iy] yc = v_mesh[ix, iy] for ipanel, panel in enumerate(panel_list): issample, inpanel = panel.is_inside(xc, yc) if inpanel: if issample: panel.add_sample([xc, yc, ix, iy, deviation[ix, iy]]) else: panel.add_margin([xc, yc, ix, iy, deviation[ix, iy]]) panel_map[ix, iy] = ipanel break return panel_map
[docs] def create_aperture_mask( self, u_axis, v_axis, use_detailed_mask=True, return_polar_meshes=False, use_outer_limit=False, ): u_mesh, v_mesh, radius_mesh, polar_angle_mesh = create_coordinate_images( u_axis, v_axis, create_polar_coordinates=True ) if use_detailed_mask: # This is the slowest line, means of optimizing this would be great point_list = [Point(u_val, -v_val) for v_val in v_axis for u_val in u_axis] pnt_str_tree = STRtree(point_list) ap_polygon = Polygon(self.aperture_polygon) intersection = pnt_str_tree.query(ap_polygon, predicate="intersects") mask_1d = np.full((v_axis.shape[0] * u_axis.shape[0]), False) mask_1d[intersection] = True mask = mask_1d.reshape((u_axis.shape[0], v_axis.shape[0])) else: if use_outer_limit: outer_radius = self.outer_radial_limit else: outer_radius = self.diameter / 2.0 mask = np.full_like(radius_mesh, True, dtype=bool) mask = np.where(radius_mesh > outer_radius, False, mask) # This line does not need to be included as there is no blockage in the ngvla! # mask = np.where(radius_mesh < self.inner_radial_limit, False, mask) if return_polar_meshes: return mask, radius_mesh, polar_angle_mesh else: return mask
[docs] def phase_to_deviation(self, u_axis, v_axis, mask, phase, wavelength): if self.z_cos_image is None: global_qps = GlobalQPS.from_point_cloud_and_coefficients( self.point_cloud, self.qps_coefficients ) self.z_cos_image = global_qps.compute_gridded_z_cos(u_axis, v_axis, mask) deviation = phase * wavelength / fourpi / self.z_cos_image return deviation
[docs] def deviation_to_phase(self, u_axis, v_axis, mask, deviation, wavelength): if self.z_cos_image is None: global_qps = GlobalQPS.from_point_cloud_and_coefficients( self.point_cloud, self.qps_coefficients ) self.z_cos_image = global_qps.compute_gridded_z_cos(u_axis, v_axis, mask) phase = fourpi * self.z_cos_image * deviation / wavelength return phase
[docs] def get_proper_telescope(name: str, antenna_name: str = None): """ Retrieve the proper telescope object based on the name Args: name: Name of the telescope antenna_name: Name of the antenna, significant for heterogenius arrays. Returns: A telescope object of one of the proper subclasses """ name = name.lower() if isinstance(antenna_name, str): antenna_name = antenna_name.lower() if "ngvla" in name: return NgvlaPrototype.from_name("ngvla_proto_2025") elif "vla" in name: if antenna_name is None or "ea" in antenna_name or antenna_name == "all": return RingedCassegrain.from_name("vla") elif "na" in antenna_name: return NgvlaPrototype.from_name("ngvla_proto_2025") else: raise ValueError(f"Unsupported antenna type for the VLA: {antenna_name}") elif "vlba" in name: return RingedCassegrain.from_name("vlba") elif "alma" in name: if antenna_name is None: raise ValueError( "ALMA is an heterogenious array and hence an antenna name is needed" ) elif "dv" in antenna_name: return RingedCassegrain.from_name("alma_dv") elif "da" in antenna_name: return RingedCassegrain.from_name("alma_da") elif "tp" in antenna_name: return RingedCassegrain.from_name("alma_tp") else: raise ValueError(f"Unsupported antenna type for ALMA: {antenna_name}") elif "aca" in name: return RingedCassegrain.from_name("aca_7m") else: return None