Source code for astrohack.utils.imaging

import math
import scipy.ndimage
import numpy as np
import astropy.units as units
import astropy.coordinates as coord
from numba import njit
import scipy.fftpack
import time

import toolviper.utils.logger as logger

from astrohack.utils.algorithms import calc_coords
from astrohack.utils.gridding import gridding_correction
from astrohack.utils.constants import clight, njit_caching


[docs] def calculate_parallactic_angle_chunk( time_samples, observing_location, direction, dir_frame="FK5", zenith_frame="FK5", ): """ Converts a direction and zenith (frame FK5) to a topocentric Altitude-Azimuth (https://docs.astropy.org/en/stable/api/astropy.coordinates.AltAz.html) frame centered at the observing_location (frame ITRF) for a UTC time. The parallactic angles is calculated as the position angle of the Altitude-Azimuth direction and zenith. Parameters ---------- time_samples: str np.array, [n_time], 'YYYY-MM-DDTHH:MM:SS.SSS' UTC time series. Example '2019-10-03T19:00:00.000'. observing_location: int np.array, [3], [x,y,z], meters ITRF geocentric coordinates. direction: float np.array, [n_time,2], [time,ra,dec], radians The pointing direction. dir_frame: frame of refecerence of the direction zenith_frame: Frame of reference of Zenith's coordinates? Returns ------- parallactic_angles: float np.array, [n_time], radians An array of parallactic angles. """ observing_location = coord.EarthLocation.from_geocentric( x=observing_location[0] * units.m, y=observing_location[1] * units.m, z=observing_location[2] * units.m, ) direction = coord.SkyCoord( ra=direction[:, 0] * units.rad, dec=direction[:, 1] * units.rad, frame=dir_frame.lower(), ) zenith = coord.SkyCoord(0, 90, unit=units.deg, frame=zenith_frame.lower()) altaz_frame = coord.AltAz(location=observing_location, obstime=time_samples) zenith_altaz = zenith.transform_to(altaz_frame) direction_altaz = direction.transform_to(altaz_frame) return direction_altaz.position_angle(zenith_altaz).value
[docs] def parallactic_derotation(data, parallactic_angle_dict): """Uses samples of parallactic angle (PA) values to correct differences in PA between maps. The reference PA is selected to be the first maps median parallactic angle. All values are rotated to this PA value using scipy.ndimage.rotate(...) Args: data (numpy.ndarray): beam data grid (map, chan, pol, l, m) parallactic_angle_dict (dict): dictionary containing antenna selected xds from which the parallactic angle samples are retrieved ==> [map](xds), here the map referred to the map values not the map index. Returns: numpy.ndarray: rotation adjusted beam data grid """ # Find the middle index of the array. This is calculated because there might be a desire to change # the array length at some point and I don't want to hard code the middle value. # # It is assumed, and should be true, that the parallactic angle array size is consistent over map. maps = list(parallactic_angle_dict.keys()) # Get the median index for the first map (this should be the same for every map). # median_index = len(parallactic_angle_dict[maps[0]].parallactic_samples) // 2 # This is the angle we will rotate the maps to. # median_angular_reference = parallactic_angle_dict[maps[0]].parallactic_samples[median_index] for mapping, map_value in enumerate(maps): # median_angular_offset = median_angular_reference - parallactic_angle_dict[map_value].parallactic_samples[ # median_index] median_angular_offset *= 180/np.pi # parallactic_angle = 360 - parallactic_angle_dict[map_value].parallactic_samples[median_index]*180/np.pi data[mapping] = scipy.ndimage.rotate( input=data[mapping, ...], angle=90, axes=(3, 2), reshape=False ) return data
[docs] def calculate_far_field_aperture( grid, padding_factor, freq, telescope, sky_cell_size, apply_grid_correction, label ): """Calculates the aperture illumination pattern from the beam data. Args: grid (numpy.ndarray): gridded beam data padding_factor (int, optional): Padding to apply to beam data grid before FFT. Padding is applied on outer edged of each beam data grid and not between layers. Defaults to 20. freq: Beam grid frequency axis telescope: telescope object with optical parameters sky_cell_size: Sky cell size (radians) apply_grid_correction: Apply grid correction (True for gaussian convolution of the beam) label: Data label for messages Returns: aperture grid, u-coordinate array, v-coordinate array, aperture cell size, representative wavelength """ start = time.time() logger.debug(f"{label}: Calculating far field aperture illumination pattern ...") padded_grid = _pad_beam_image(grid, padding_factor) aperture_grid = _compute_aperture_fft(padded_grid) wavelength = clight / freq[0] u_axis, v_axis, _, _, aperture_cell_size = _compute_axes( padded_grid.shape, sky_cell_size, wavelength ) if apply_grid_correction: aperture_grid = gridding_correction( aperture_grid, freq, telescope.diameter, sky_cell_size, u_axis, v_axis ) duration = time.time() - start logger.debug(f"{label}: Far field aperture took {duration:.3} seconds") return aperture_grid, u_axis, v_axis, aperture_cell_size, wavelength
[docs] def calculate_near_field_aperture( grid, sky_cell_size, distance, freq, padding_factor, focus_offset, telescope, apply_grid_correction, label, apodize=True, ): """ " Calculates the aperture illumination pattern from the near_fiedl beam data. Args: grid (numpy.ndarray): gridded beam data sky_cell_size (float): incremental spacing between lm values, i.e. delta_l = l_(n+1) - l_(n) padding_factor (int, optional): Padding to apply to beam data grid before FFT. Padding is applied on outer edges of each beam data grid and not between layers. Defaults to 20. distance: distance to holographic tower focus_offset: Offset from primary focus on holographic receiver freq: Beam grid frequency axis telescope: telescope object with optical parameters apply_grid_correction: Apply grid correction (True for gaussian convolution of the beam) apodize: Apodize beam to avoid boxing effects in the FFT (the dashed line cross) label: Data label for messages Returns: aperture grid, u-coordinate array, v-coordinate array, aperture cell size, representative wavelength """ logger.debug(f"{label}: Calculating near field aperture illumination pattern ...") start = time.time() work_grid = grid.copy() if apodize: apodizer = _apodize_beam(work_grid[0, 0, 0, ...]) work_grid[0, 0, 0, ...] *= apodizer padded_grid = _pad_beam_image(work_grid, padding_factor) wavelength = clight / freq[0] z_max = (telescope.diameter / 2) ** 2 / 4 / telescope.focus scale = 1.0 + (telescope.el_axis_off + z_max / 2.0) / distance u_axis, v_axis, l_axis, m_axis, aperture_cell_size = _compute_axes( padded_grid.shape, sky_cell_size, wavelength, scale=scale ) aperture_grid = _compute_aperture_fft(padded_grid) factor = 2j * np.pi / wavelength aperture_grid = _compute_non_fresnel_corrections( padded_grid, aperture_grid, l_axis, m_axis, u_axis, v_axis, factor, distance ) if apply_grid_correction: aperture_grid = gridding_correction( aperture_grid, freq, telescope.diameter, sky_cell_size, u_axis, v_axis ) aperture_grid = _correct_phase_nf_effects( aperture_grid, u_axis, v_axis, distance, focus_offset, telescope.focus, factor ) # phase = np.angle(aperture_grid[0, 0, 0, ...]) amp = np.absolute(aperture_grid[0, 0, 0, ...]) # dish horn_artefact = fit_dish horn_beam_artefact(amp, telescope.inlim, u_axis, v_axis) # amp -= dish horn_artefact phase = _feed_correction(phase, u_axis, v_axis, telescope.focus) # fitted_amp = fit_illumination_pattern(amp, u_axis, v_axis, telescope.diam, blockage) # aperture_grid[0, 0, 0, ...] = fitted_amp * (np.cos(phase) + 1j * np.sin(phase)) aperture_grid[0, 0, 0, ...] = amp * (np.cos(phase) + 1j * np.sin(phase)) duration = time.time() - start logger.debug(f"{label}: Near field aperture took {duration:.3} seconds") return aperture_grid, u_axis, v_axis, aperture_cell_size, wavelength
def _feed_correction(phase, u_axis, v_axis, focal_length, nk=10): """ Correction to the phases due to the phase change created by the antenna feed Args: phase: Aperture phase u_axis: U axis v_axis: V Axis focal_length: Telescope focal length nk: number of terms to include Returns: """ # Tabulated Sigma GE and GH functions: gh_tab = [ 13.33004 - 0.03155j, -1.27077 + 0.00656j, 0.38349 - 0.17755j, 0.78041 - 0.11238j, -0.54821 + 0.16739j, -0.68021 + 0.11472j, 1.05341 - 0.01921j, -0.80119 + 0.06443j, 0.36258 - 0.01845j, -0.07905 + 0.00515j, ] ge_tab = [ 12.79400 + 2.27305j, 1.06279 - 0.56235j, -1.92694 - 1.72309j, 1.79152 - 0.08008j, 0.09406 + 0.46197j, -2.82441 - 1.06010j, 2.77077 + 1.02349j, -1.74437 - 0.45956j, 0.62276 + 0.11504j, -0.11176 + 0.01616j, ] umesh, vmesh = np.meshgrid(u_axis, v_axis) radius2 = umesh**2 + vmesh**2 theta2 = radius2 / 4 / focal_length**2 theta = np.sqrt(theta2) z_term = (1 - theta2 + 2j * theta) / (1 + theta2) zz_term = np.full_like(phase, 1 + 0j, dtype=complex) geth = np.zeros_like(phase, dtype=complex) ghth = np.zeros_like(phase, dtype=complex) for k in range(nk): zz_term *= z_term costh = zz_term.real geth += ge_tab[k] * costh ghth += gh_tab[k] * costh phi = np.arctan2(vmesh, umesh) gain = geth * np.sin(phi) ** 2 + ghth * np.cos(phi) ** 2 feed_phase = np.angle(gain) return phase + feed_phase def _circular_gaussian(axes, amp, x0, y0, sigma, offset): """ Compute a Circular gaussian image Args: axes: X and Y axes mesh grids amp: Gaussian amplitude x0: Gaussian center in X y0: Gaussian center in Y sigma: Gaussian width offset: gaussian base offset Returns: circular gaussian image """ x_axis, y_axis = axes x0 = float(x0) y0 = float(y0) expo = 1 * ((x_axis - x0) ** 2 + (y_axis - y0) ** 2) expo /= 2 * sigma**2 return amp * np.exp(-expo) + offset def _pad_beam_image(grid, padding_factor): """ Pad beam image with zeros to avoid aliasing in FFTs Args: grid: beam grid padding_factor: padding factor to determine padded size Returns: Zero padded beam grid """ assert ( grid.shape[-1] == grid.shape[-2] ) ###To do: why is this expected that l.shape == m.shape initial_dimension = grid.shape[-1] # Calculate padding as the nearest power of 2 # k log (2) = log(N) => k = log(N)/log(2) # New shape => K = math.ceil(k) => shape = (K, K) k_coeff = np.log(initial_dimension * padding_factor) / np.log(2) k_integer = math.ceil(k_coeff) padded_size = np.power(2, k_integer) padding = (padded_size - initial_dimension) // 2 z_pad = [0, 0] if initial_dimension == initial_dimension // 2 * 2: pad_wid = [padding, padding] else: pad_wid = [padding + 1, padding] pad_width = np.array([z_pad, z_pad, z_pad, pad_wid, pad_wid]) padded_grid = np.pad(array=grid, pad_width=pad_width, mode="constant") return padded_grid @njit(cache=njit_caching, nogil=True) def _apodize_beam(unpadded_beam, degree=2): """ Apodize beam image to avoid artefacts in aperture image Args: unpadded_beam: Unpadded beam image degree: Degree of apodization Returns: Apodizing image """ nx, ny = unpadded_beam.shape apodizer = np.zeros(unpadded_beam.shape) for ix in range(nx): xfac = 4 * (ix - nx - 1) * (ix - 1) / (nx**degree) for iy in range(ny): yfac = 4 * (iy - ny - 1) * (iy - 1) / (ny**degree) apodizer[ix, iy] = xfac * yfac return apodizer def _correct_phase_nf_effects( aperture, u_axis, v_axis, distance, focus_offset, focal_length, factor ): """ DOES NOT WORK AS INTENDED Compute the near field corrections to be applied to the phases Args: aperture: Aperture Grid image u_axis: U axis v_axis: V axis distance: Distance to holographic tower focus_offset: Defocus used to mitigate NF effects focal_length: Telescope focal length factor: wave number scaling Returns: Aperture with phase corrected for the NF effects """ umesh, vmesh = np.meshgrid(u_axis, v_axis) wave_vector = factor axis_dist2 = umesh**2 + vmesh**2 # \epsilon^2 + \eta^2 z_term = axis_dist2 / 4 / focal_length # \frac{\epsilon^2 + \eta^2}{4f} first_term = axis_dist2 / 2 / distance # \frac{\epsilon^2 + \eta^2}{2R} second_term = ( axis_dist2**2 / 8 / distance**3 ) # \frac{(\epsilon^2 + \eta^2)^2}{8R^3} dp1 = first_term - second_term # \sqrt{(\epsilon^2 + \eta^2) + (f - \frac{\epsilon^2 + \eta^2}{4f} +df)^2} first_term = np.sqrt(axis_dist2 + (focal_length + focus_offset - z_term) ** 2) # f - \frac{\epsilon^2 + \eta^2}{4f} +df second_term = focal_length + z_term + focus_offset dp2 = first_term - second_term path_var = dp1 + dp2 aperture[0, 0, 0, ...] *= np.exp(wave_vector * path_var) return aperture def _compute_axes(shape, sky_cell_size, wavelength, scale=1.0): """ Compute the axes of the padded beam image and also the aperture axes Args: shape: Padded beam image shape sky_cell_size: Sky cell size (radians) wavelength: representative wavelenght scale: axis scaling Returns: U, V, L and M axis and aperture cell size """ image_size = np.array([shape[-2], shape[-1]]) aperture_cell_size = wavelength / (image_size * scale * sky_cell_size) u_axis, v_axis = calc_coords(image_size, aperture_cell_size) l_axis, m_axis = calc_coords(image_size, scale * sky_cell_size) return u_axis, v_axis, l_axis, m_axis, aperture_cell_size def _compute_aperture_fft(padded_grid): """ Compute aperture FFT Args: padded_grid: Zero padded beam grid Returns: Aperture """ shifted = scipy.fftpack.ifftshift(padded_grid) grid_fft = scipy.fftpack.fft2(shifted) aperture_grid = scipy.fftpack.fftshift(grid_fft) return aperture_grid def _compute_non_fresnel_corrections( padded_grid, aperture_grid, l_axis, m_axis, u_axis, v_axis, factor, distance, max_it=6, verbose=True, ): """ DOES NOT WORK AS INTENDED Compute non fresnel corrections to the aperture Args: padded_grid: Zero padded beam grid aperture_grid: The FFT of the padde beam grid l_axis: Beam's L axis m_axis: Beam's M axis u_axis: Aperture's U axis v_axis: Aperture's V axis factor: Wave number distance: Distance to the hologrphy tower max_it: Number of iterations verbose: Print messages? Returns: Aperture with non fresnel corrections """ mega_max_it = 6 if verbose: logger.info("Applying non-fresnel corrections...") wave_vector = factor lmesh, mmesh = np.meshgrid(l_axis, m_axis) umesh, vmesh = np.meshgrid(u_axis, v_axis) u2mesh = np.power(umesh, 2) v2mesh = np.power(vmesh, 2) dist2 = u2mesh + v2mesh it = 1 while it < max_it: corr_term = None if it < 1 or it >= mega_max_it: raise RuntimeError(f"Maximum number of iterations is {mega_max_it}") fft_work_array = padded_grid[0, 0, 0, ...].copy() if it == 1: fft_work_array *= lmesh elif it == 2: fft_work_array *= mmesh elif it == 3: fft_work_array *= np.power(lmesh, 2) elif it == 4: fft_work_array *= np.power(mmesh, 2) elif it == 5: fft_work_array *= lmesh * mmesh fft_term = _compute_aperture_fft(fft_work_array) if it == 1: corr_term = umesh * dist2 / 2 / distance**2 * wave_vector elif it == 2: corr_term = vmesh * dist2 / 2 / distance**2 * wave_vector elif it == 3: corr_term = -1 * u2mesh / 2 / distance * wave_vector elif it == 4: corr_term = -1 * v2mesh / 2 / distance * wave_vector elif it == 5: corr_term = -1 * umesh * vmesh / distance * wave_vector add_term = corr_term * fft_term aperture_grid += add_term it += 1 return aperture_grid