Source code for astrohack.scripts.beamcuts.beamcut_CASA_calibration

import argparse
import os
import shutil

import numpy as np
import casatools
from pathlib import Path
from casatasks import importasdm, gaincal, bandpass, applycal
from astrohack.utils.user_interaction import MessageBoard, initialization_check


[docs] def parse(): parser = argparse.ArgumentParser(description="Beam cut CASA calibration pipeline") parser.add_argument("filename", type=str, help="Path to the input MS/ASDM file") parser.add_argument("refant", type=str, help="Reference antenna for calibration") parser.add_argument( "-r", "--root-name", type=str, default=None, help="Root name for the calibration tables, default is filename without extension", ) parser.add_argument( "-f", "--beamcut-field", default=None, type=str, help="Field Id or name of the beam cut data (default is to determine it from data)", ) parser.add_argument( "-o", "--overwrite", default=False, action="store_true", help="Overwrite existing calibration files", ) parser.add_argument( "-y", "--assume-yes", action="store_true", help="Assume yes on proceed." ) parser.add_argument( "-q", "--quack-nchan", default=4, type=int, help="Number of channels to quack at the edge of the spectral window (default is %(default)s)", ) param_dict = vars(parser.parse_args()) return param_dict
[docs] class CalObject: def __init__(self, param_dict: dict, msger: MessageBoard):
[docs] self.filename = param_dict["filename"]
[docs] self.refant = param_dict["refant"]
[docs] self.overwrite = param_dict["overwrite"]
[docs] self.is_asdm = self._is_asdm()
if param_dict["root_name"] is None: base_cal_name = f"{self.filename}." else: base_cal_name = param_dict["root_name"] + "."
[docs] self.delay_caltable = base_cal_name + "delay.cal"
[docs] self.bandpass_caltable = base_cal_name + "bandpass.cal"
[docs] self.gain_caltable = base_cal_name + "gain.cal"
if self.is_asdm: self.msname = f"{self.filename}.ms" msger.one_liner("Input is an SDM running importasdm...") self.asdm_to_ms() msger.done() else: self.msname = self.filename self._initialize_metadata(param_dict["quack_nchan"]) param_dict.update( { key: value for key, value in vars(self).items() if not key.startswith("__") } ) initialization_check( param_dict, "CASA calibration parameters", )
[docs] self.msger = msger
def _is_asdm(self): file_path = Path(f"{self.filename}/ASDM.xml") return file_path.exists()
[docs] def asdm_to_ms(self): if os.path.exists(self.msname) and self.overwrite: self.msger.heading(f"Removing previous ms file ({self.msname})") shutil.rmtree(self.msname) importasdm( asdm=self.filename, vis=self.msname, createmms=False, ocorr_mode="co", lazy=False, asis="Receiver CalAtmosphere", process_caldevice=True, process_pointing=True, savecmds=True, outfile=f"{self.filename}.flagonline.txt", bdfflags=False, with_pointing_correction=True, applyflags=True, overwrite=False, ) return
def _initialize_metadata(self, quack_nchan): # Fetch metadata from ms msmd = casatools.msmetadata() msmd.open(self.msname) cal_scans = msmd.scansforintent("*PHASE*") beamcut_scans = msmd.scansforintent("*MAP*ON_SOURCE") spw_list = msmd.spwsforintent("*MAP*") beamcut_fields = np.unique(msmd.fieldsforscans(beamcut_scans)) nchan = np.unique([msmd.nchan(i_spw) for i_spw in spw_list]) msmd.done() if beamcut_fields.size > 1: raise RuntimeError("More than 1 beam cut field, try splitting the ms") if nchan.size > 1: raise RuntimeError( "Spectral windows have different nchans, don't know how to proceed automatically" ) fchan = quack_nchan lchan = nchan[0] - quack_nchan # Convert to comma-separated string self.calibration_scans = ",".join(map(str, cal_scans)) self.beamcut_scans = ",".join(map(str, beamcut_scans)) self.beamcut_field = beamcut_fields[0] minspw = f"{round(np.min(spw_list)):d}" maxspw = f"{round(np.max(spw_list)):d}" spwrange = f"{minspw}~{maxspw}" self.quacked_spw_selection = f"{spwrange}:{fchan}~{lchan}" def _do_calibration(self, cal_name): if os.path.exists(cal_name): if self.overwrite: self.msger.one_liner(f"{cal_name} exists, overwriting.") return True else: self.msger.one_liner(f"{cal_name} exists, keeping it.") return False else: self.msger.one_liner(f"{cal_name} does not exist, creating it...") return True
[docs] def delay_calibration(self): self.msger.one_liner("Delay calibration...") if self._do_calibration(self.delay_caltable): gaincal( vis=self.msname, caltable=self.delay_caltable, refant=self.refant, solint="inf", spw=self.quacked_spw_selection, scan=self.calibration_scans, gaintype="K", ) self.msger.done() else: self.msger.one_liner("Skipping delay calibration...") return
[docs] def bandpass_calibration(self): self.msger.one_liner("Bandpass calibration...") if self._do_calibration(self.bandpass_caltable): bandpass( vis=self.msname, caltable=self.bandpass_caltable, refant=self.refant, solint="10s", spw=self.quacked_spw_selection, solnorm=True, scan=self.calibration_scans, gaintable=[self.delay_caltable], ) self.msger.done() else: self.msger.one_liner("Skipping bandpass calibration...") return
[docs] def gain_calibration(self): self.msger.one_liner("Gain calibration...") if self._do_calibration(self.gain_caltable): gaincal( vis=self.msname, caltable=self.gain_caltable, refant=self.refant, calmode="ap", solint="inf", spw=self.quacked_spw_selection, minsnr=2, minblperant=2, scan=self.calibration_scans, gaintable=[self.delay_caltable, self.bandpass_caltable], ) self.msger.done() else: self.msger.one_liner("Skipping gain calibration...") return
[docs] def apply_calibration(self): self.msger.one_liner("Applying calibration...") applycal( vis=self.msname, field=f"{self.beamcut_field}", spw=self.quacked_spw_selection, applymode="calonly", gaintable=[self.delay_caltable, self.bandpass_caltable, self.gain_caltable], ) self.msger.done() return
[docs] def calibration_pipeline(self): self.delay_calibration() self.bandpass_calibration() self.gain_calibration() return
[docs] def main(): msger = MessageBoard() msger.heading("Welcome to CASA beam cut calibration pipeline") cal_param_dict = parse() mycal_obj = CalObject(cal_param_dict, msger) mycal_obj.calibration_pipeline() mycal_obj.apply_calibration() msger.heading("Beamcut calibration complete!")