Source code for astrohack.scripts.beamcuts.beamcut_reduction_pipeline

import argparse
from toolviper.dask.client import local_client
from astrohack import (
    extract_pointing,
    extract_holog,
    beamcut,
    open_pointing,
    open_beamcut,
)
from astrohack.utils.user_interaction import initialization_check, MessageBoard
import inspect


[docs] def create_param_dict(args): extensions = { "point": ".point.zarr", "holog": ".holog.zarr", "beamcut": ".beamcut.zarr", "exports": ".exports", } param_dict = {} param_dict.update(vars(args)) param_dict["parallel"] = args.ncores >= 2 if args.root_name is None: name_components = args.ms_name.split(".")[:-1] if len(name_components) == 0: param_dict["root_name"] = args.ms_name else: param_dict["root_name"] = ".".join(name_components) for identifier, extension in extensions.items(): param_dict[f"{identifier}_name"] = param_dict["root_name"] + extension if args.antenna != "all": param_dict["antenna"] = args.antenna.split(",") if args.spectral_window != "all": param_dict["spectral_window"] = [ int(spw_id) for spw_id in args.spectral_window.split(",") ] if args.exclude_bad_antennas is not None: param_dict["exclude_bad_antennas"] = args.exclude_bad_antennas.split(",") initialization_check(param_dict, "Beam cut reduction parameters") param_dict["ant"] = param_dict["antenna"] param_dict["ddi"] = param_dict["spectral_window"] param_dict["exclude_antennas"] = param_dict["exclude_bad_antennas"] return param_dict
[docs] def list_input_tooltip(example): return f"for a list use comma separated values with no spaces, e.g.: '{example}'"
[docs] def parse(): parser = argparse.ArgumentParser(description="Beam cut reduction pipeline") parser.add_argument("ms_name", type=str, help="Path to the input ms to process.") parser.add_argument( "-r", "--root-name", type=str, default=None, help="Root name for the products of the pipeline, default" " is ms_name without extension", ) parser.add_argument( "-s", "--spectral-window", type=str, default="all", help=f"Select SPWs for which to produce beam cuts, {list_input_tooltip('0,1,2')}, default is %(default)s", ) parser.add_argument( "-a", "--antenna", type=str, default="all", help="Select antennas for which to produce beam cuts, " f"{list_input_tooltip('ea01,ea02')}, default is %(default)s", ) parser.add_argument( "-n", "--ncores", type=int, default=4, help="Number of cores to use, default is %(default)d", ) parser.add_argument( "-m", "--memory-per-core", type=str, default="10GB", help="Memory per core to use, default is %(default)s", ) parser.add_argument( "-o", "--overwrite", action="store_true", help="Overwrite existing files if found", ) parser.add_argument( "-d", "--data-column", type=str, default="CORRECTED_DATA", help="Data column to be extracted from MS, default is %(default)s", ) parser.add_argument( "-y", "--assume-yes", action="store_true", help="Assume yes on proceed." ) # Example of parameter with choice parser.add_argument( "--starting-stage", type=str, default="extract_pointing", choices=["extract_pointing", "extract_holog", "beamcut", "plotting"], help="Starting stage in which to start processing (default: %(default)s).", ) parser.add_argument( "--dpi", type=int, default=300, help="Dots Per Inch for plotting, default is %(default)d", ) parser.add_argument( "--plot-array-configuration", action="store_true", help="Plot array configuration, default is %(default)s", ) parser.add_argument( "--plot-pointing", action="store_true", help="Plot antenna pointing, default is %(default)s", ) parser.add_argument( "--exclude-bad-antennas", default=None, type=str, help=f"Exclude antennas with bad data, {list_input_tooltip('ea18,ea01')}, default is %(default)s.", ) args = parser.parse_args() param_dict = create_param_dict(args) return param_dict
[docs] def created_filtered_kwargs_dict(param_dict, function): valid_kwarg_keys = inspect.signature(function).parameters filtered_dict = { key: value for key, value in param_dict.items() if key in valid_kwarg_keys } return filtered_dict
[docs] def execute_step(param_dict, function, next_stage, msger): function_name = function.__name__ if param_dict["processing_stage"] == function_name: try: msger.one_liner(f"Executing {function_name}...") function(**created_filtered_kwargs_dict(param_dict, function)) msger.done() param_dict["processing_stage"] = next_stage return True, None except Exception as the_exception: return False, the_exception else: return True, None
[docs] def data_reduction(param_dict, msger): status = True exec_exception = None exec_list = [ ["extract_holog", extract_pointing], ["beamcut", extract_holog], ["plotting", beamcut], ] for next_stage, function in exec_list: if status: status, exec_exception = execute_step( param_dict, function, next_stage, msger ) if not status: raise RuntimeError( f"{param_dict['processing_stage']} failed, see above for details." ) from exec_exception return
[docs] def post_processing(param_dict, msger): param_dict["destination"] = param_dict["exports_name"] bmc_mds = open_beamcut(param_dict["beamcut_name"]) if bmc_mds is not None: msger.heading("Producing pipeline exports...") beamcut_methods = [ bmc_mds.plot_beamcut_in_amplitude, bmc_mds.plot_beamcut_in_phase, bmc_mds.plot_beamcut_in_db, bmc_mds.plot_beamcut_lm_offsets, bmc_mds.export_beamcut_report, ] for method in beamcut_methods: msger.one_liner(f"Running {method.__name__}...") method(**created_filtered_kwargs_dict(param_dict, method)) msger.one_liner("Beamcut exports Done!") if param_dict["plot_array_configuration"] or param_dict["plot_pointing"]: pnt_mds = open_pointing(param_dict["point_name"]) if pnt_mds is not None: msger.heading("Producing pointing exports...") pnt_methods = [] if param_dict["plot_array_configuration"]: pnt_methods.append(pnt_mds.plot_array_configuration) if param_dict["plot_pointing"]: pnt_methods.append(pnt_mds.plot_pointing_in_time) for method in pnt_methods: msger.one_liner(f"Running {method.__name__}...") method(**created_filtered_kwargs_dict(param_dict, method)) msger.one_liner("Pointing exports Done!") return
[docs] def main(): msger = MessageBoard() msger.heading("Welcome to AstroHACK BeamCut Pipeline") main_param_dict = parse() if main_param_dict["parallel"]: client = local_client( cores=main_param_dict["ncores"], memory_limit=main_param_dict["memory_per_core"], ) else: client = None main_param_dict["processing_stage"] = main_param_dict["starting_stage"] data_reduction(main_param_dict, msger) if main_param_dict["processing_stage"] == "plotting": post_processing(main_param_dict, msger) if main_param_dict["parallel"]: client.shutdown() msger.heading("Beamcut pipeline complete!")