Source code for astrohack.utils.fits

from toolviper.utils import logger as logger

import datetime
import numpy as np

from astropy.io import fits

from astrohack.utils.text import add_prefix


[docs] def get_stokes_axis_iaxis(header): """ Get which of the axis in the header is the stokes axis Args: header: FITS header Returns: None if no stokes axis is found, iaxis if stokes axis is found """ naxis = header["NAXIS"] for iaxis in range(naxis): axis_type = safe_keyword_fetch(header, f"CTYPE{iaxis+1}") if "STOKES" in axis_type: return iaxis + 1 return None
[docs] def safe_keyword_fetch(header_dict, keyword): """ Tries to fetch a keyword from a FITS header / dictionary Args: header_dict: FITS header / Dictionary keyword: The intended keyword to fetch Returns: Keyword value if prensent, None if not present. """ try: return header_dict[keyword] except KeyError: return None
[docs] def read_fits_no_checks(filename): """ Brute force reading of a fits file, no checks are performed :param filename: Fits filename :return: FITS header as a dict and associated data """ hdul = fits.open(filename) head = hdul[0].header data = hdul[0].data hdul.close() header_dict = {} for key, value in head.items(): header_dict[key] = value return header_dict, data
[docs] def read_fits(filename, header_as_dict=True): """ Reads a square FITS file and do sanity checks on its dimensionality Args: filename: a string containing the FITS file name/path header_as_dict: return header as dictionary Returns: The FITS header and the associated data array """ hdul = fits.open(filename) head = hdul[0].header data = hdul[0].data hdul.close() if head["NAXIS"] != 1: if head["NAXIS"] < 1: raise ValueError(filename + " is not bi-dimensional") elif head["NAXIS"] > 1: for iax in range(2, head["NAXIS"]): if head["NAXIS" + str(iax + 1)] != 1: raise ValueError(filename + " is not bi-dimensional") if head["NAXIS1"] != head["NAXIS2"]: raise ValueError( filename + " does not have the same amount of pixels in the x and y axes" ) if header_as_dict: header_dict = {} for key, value in head.items(): header_dict[key] = value return header_dict, data else: return head, data
[docs] def get_axis_from_fits_header(header, iaxis, pixel_offset=True): """ Pull axis information from FITS file and store it in a numpy array, ignores rotation in axes. Args: header: FITS header iaxis: Which axis is to be fetched from the header. pixel_offset: apply one pixel offset Returns: numpy array representation of axis, axis type and axis unit """ n_elem = header[f"NAXIS{iaxis}"] ref = header[f"CRPIX{iaxis}"] inc = header[f"CDELT{iaxis}"] if pixel_offset: val = ( header[f"CRVAL{iaxis}"] + inc ) # This makes this routine symmetrical to the put routine. else: val = header[f"CRVAL{iaxis}"] axis = np.arange(n_elem) axis = val + (ref - axis) * inc axis_unit = safe_keyword_fetch(header, f"CUNIT{iaxis}") axis_type = safe_keyword_fetch(header, f"CTYPE{iaxis}") return axis, axis_type, axis_unit
[docs] def write_fits(header, imagetype, data, filename, unit, origin=None, reorder_axis=True): """ Write a dictionary and a dataset to a FITS file Args: header: The dictionary containing the header imagetype: Type to be added to FITS header data: The dataset filename: The name of the output file unit: to be set to bunit origin: Which astrohack mds has created the FITS being written reorder_axis: Reorder data axes so that they are compatible with regular FITS ordering """ from astrohack.utils.package_info import get_astrohack_version current_version = get_astrohack_version() header["BUNIT"] = unit header["TYPE"] = imagetype header["ORIGIN"] = f"Astrohack v{current_version}: {origin}" header["DATE"] = datetime.datetime.now().strftime("%b %d %Y, %H:%M:%S") if origin is None: header["ORIGIN"] = f"Astrohack v{current_version}" outfile = filename else: header["ORIGIN"] = f"Astrohack v{current_version}: {origin}" outfile = add_prefix(filename, origin) if reorder_axis: hdu = fits.PrimaryHDU(_reorder_axes_for_fits(data)) else: hdu = fits.PrimaryHDU(data) for key in header.keys(): hdu.header.set(key, header[key]) hdu.writeto(outfile, overwrite=True) return
def _reorder_axes_for_fits(data: np.ndarray): carta_dim_order = ( 1, 0, 2, 3, ) shape = data.shape n_dim = len(shape) if n_dim == 5: # Ignore the time dimension and flip polarization and frequency axes transpo = np.transpose(data[0, ...], carta_dim_order) # Flip vertical axis return np.flip(transpo, 2) elif n_dim == 2: return np.flipud(data) else: raise RuntimeError("This should be a blocked path")
[docs] def put_resolution_in_fits_header(header, resolution): """ Adds resolution information to standard header keywords: BMAJ, BMIN and BPA Args: header: The dictionary header to be augmented resolution: The lenght=2 array with the resolution elements Returns: The augmented header dictionary """ if resolution is None: return header if resolution[0] >= resolution[1]: header["BMAJ"] = resolution[0] header["BMIN"] = resolution[1] header["BPA"] = 0.0 else: header["BMAJ"] = resolution[1] header["BMIN"] = resolution[0] header["BPA"] = 90.0 return header
[docs] def put_axis_in_fits_header(header: dict, axis, iaxis, axistype, unit, iswcs=True): """ Process an axis to create a FITS compatible linear axis description Args: header: The header to add the axis description to axis: The axis to be described in the header iaxis: The position of the axis in the data axistype: Axis type to be displayed in the fits header unit: Axis unit iswcs: Is the axis a part of World Coordinate System for the image? Returns: The augmented header """ outheader = header.copy() try: wcsaxes = outheader["WCSAXES"] except KeyError: wcsaxes = 0 naxis = len(axis) if naxis == 1: inc = axis[0] else: inc = axis[1] - axis[0] if inc == 0: msg = "Axis increment is zero valued" logger.error(msg) raise ValueError(msg) absdiff = abs((axis[-1] - axis[-2]) - inc) / inc if absdiff > 1e-7: msg = "Axis is not linear!" logger.error(msg) raise ValueError(msg) ref = naxis // 2 val = axis[ref] if iswcs: wcsaxes += 1 outheader[f"WCSAXES"] = wcsaxes outheader[f"NAXIS{iaxis}"] = naxis outheader[f"CRVAL{iaxis}"] = val - inc outheader[f"CDELT{iaxis}"] = inc outheader[f"CRPIX{iaxis}"] = float(ref) outheader[f"CROTA{iaxis}"] = 0.0 outheader[f"CTYPE{iaxis}"] = axistype outheader[f"CUNIT{iaxis}"] = unit return outheader
[docs] def put_stokes_axis_in_fits_header(header, iaxis): """ Inserts a dedicated stokes axis in the header at iaxis Args: header: The header to add the axis description to iaxis: The position of the axis in the data Returns: The augmented header """ outheader = header.copy() try: wcsaxes = outheader["WCSAXES"] except KeyError: wcsaxes = 0 wcsaxes += 1 outheader[f"WCSAXES"] = wcsaxes outheader[f"NAXIS{iaxis}"] = 4 outheader[f"CRVAL{iaxis}"] = 1.0 outheader[f"CDELT{iaxis}"] = 1.0 outheader[f"CRPIX{iaxis}"] = 1.0 outheader[f"CROTA{iaxis}"] = 0.0 outheader[f"CTYPE{iaxis}"] = "STOKES" outheader[f"CUNIT{iaxis}"] = "" return outheader