Source code for astroq.driver

"""
Module for executing AstroQ functions based on command line interface inputs.
"""

# Standard library imports
import logging
import os
from datetime import datetime
from configparser import ConfigParser

# Third-party imports
import numpy as np
import pandas as pd
import astroplan as apl
import plotly.io as pio
from astropy.time import Time, TimeDelta
from io import BytesIO
import imageio.v3 as iio
import base64

# Local imports
import astroq.access as ac
import astroq.benchmarking as bn
import astroq.queue.kpfcc as kpfcc
import astroq.history as hs
import astroq.io as io
import astroq.nplan as nplan
import astroq.plot as pl
import astroq.splan as splan
import astroq.webapp as app

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)  # Lower level to capture more messages

[docs] def bench(args): """ Benchmark the AstroQ pipeline using a toy model. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. -ns (int): the number of slots needed to complete each of the single shot requests in program 6. -thin (int): the factor to thin the request frame by (for very fast testing). Returns: None """ cf = args.config_file number_slots = args.number_slots thin = args.thin print(f'bench function: config_file is {cf}') print(f'bench function: number_slots is {number_slots}') print(f'bench function: thin is {thin}') # Load the requests frame and thin it config = ConfigParser() config.read(cf) semester_directory = config.get('global', 'workdir') requests_frame = bn.build_toy_model_from_paper(number_slots) # Ensure starname column is always interpreted as strings if 'starname' in requests_frame.columns: requests_frame['starname'] = requests_frame['starname'].astype(str) original_size = len(requests_frame) requests_frame = requests_frame.iloc[::thin] new_size = len(requests_frame) print(f'Request frame thinned: {original_size} -> {new_size} rows (factor of {thin})') requests_frame.to_csv(os.path.join(semester_directory, "request.csv")) # Run the schedule directly from config file schedule = splan.SemesterPlanner(cf, run_band3=False) schedule.run_model() return
[docs] def kpfcc_prep(args): """ Prepare the KPF-CC program for a new semester. This function is specific to the KPF-CC program and the observatory's infrastructure. If you are adapting AstroQ for a new observatory, you will need to write your own module to connect to a new prep <your observatory> command. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. -band_number (int): the band number to filter the request.csv by. -is_full_band (bool): whether this is a full-band that should update allocation.csv. -allo_source (str): the source of the allocation information, either 'db' or a file path. -past_source (str): the source of the past history information, either 'db' or a file path. -request_source (str): the source of the request information, either 'db' or a file path. -filler_programs (str): the semester ID for the filler program. Ex. 2025B_E473. Returns: None """ cf = args.config_file print(f'kpfcc_prep function: config_file is {cf}') config = ConfigParser() config.read(cf) band_number = args.band_number is_full_band = args.is_full_band # Get workdir from global section workdir = str(config.get('global', 'workdir')) savepath = workdir semester = str(config.get('global', 'semester')) start_date = str(config.get('global', 'semester_start_day')) end_date = str(config.get('global', 'semester_end_day')) current_date = str(config.get('global', 'current_day')) start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") n_days = (end - start).days # CAPTURE ALLOCATION INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- allo_source = args.allo_source allocation_file = str(config.get('data', 'allocation_file')) # pull the allocation if allo_source == 'db': print(f'Pulling allocation information from database') conversion_ratio = config.getfloat('semester', 'hours_per_night') allocation_frame, hours_by_program, nights_by_program = kpfcc.pull_allocation_info(start_date, n_days, 'KPF-CC', conversion_ratio) awarded_programs = [semester + "_" + val for val in list(hours_by_program.keys())] programmatics = pd.DataFrame({'program': awarded_programs, 'hours': list(hours_by_program.values()), 'nights': list(nights_by_program.values())}) # Manually add one row with for the Engineering program of bright backup stars. Arbitrarily give it 50 night of time. This is intentionally high so that this "program" is not effectively throttled. programmatics = pd.concat([programmatics, pd.DataFrame([{'program': args.filler_programs, 'hours': 600.0, 'nights': 50.0}])], ignore_index=True) programmatics.to_csv(os.path.join(savepath, 'programs.csv'), index=False) else: print(f'Using allocation information from Keck Observatory Instrument Plan (KOIP) file: {allo_source}') allocation_frame, hours_by_program, nights_by_program = kpfcc.format_keck_allocation_info(allo_source) awarded_programs = [semester + "_" + val for val in list(hours_by_program.keys())] programmatics = pd.DataFrame({'program': awarded_programs, 'hours': list(hours_by_program.values()), 'nights': list(nights_by_program.values())}) programmatics.to_csv(os.path.join(savepath, 'programs.csv'), index=False) # else: # print(f'Using allocation information from file: {allo_source}') # # Validate that the file has the correct columns # expected_columns = ['start', 'stop'] # if os.path.exists(allo_source): # df = pd.read_csv(allo_source, nrows=0) # actual_columns = set(df.columns) # expected_set = set(expected_columns) # missing_columns = expected_set - actual_columns # if missing_columns: # logging.warning(f"Allocation file '{allo_source}' is missing required columns: {missing_columns}") # print("Note: if inputing your own formatted allocation file, you must also provide your own programs.csv file.") # else: # print(f"Allocation file columns validated: all required columns present") # awarded_programs = df['program'].unique().tolist() # else: # logging.warning(f"Allocation file '{allo_source}' does not exist") allocation_frame['comment'] = [''] * len(allocation_frame) # Update allocation times for tonight if this is a full-band if is_full_band: print("Updating allocation.csv for full-band") allocation_frame = kpfcc.update_allocation_file(allocation_frame, current_date) allocation_frame.sort_values(by='start', inplace=True) allocation_frame.to_csv(os.path.join(savepath, allocation_file), index=False) # CAPTURE REQUEST INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- if args.request_source == 'db': # Add filler programs if specified fillers = args.filler_programs # temporarily comment out this block for 2025B. if fillers is not None: print(f'Adding filler program to awarded_programs: {fillers}') awarded_programs.append(fillers) # Pull the request sheet request_file = str(config.get('data', 'request_file')) OBs = kpfcc.pull_OBs(semester) good_obs, bad_obs_values, bad_obs_hasFields, bad_obs_count_by_semid, bad_field_histogram = kpfcc.get_request_sheet(OBs, awarded_programs, os.path.join(savepath, request_file)) # Filter the request sheet by weather band filtered_good_obs = kpfcc.filter_request_csv(good_obs, band_number) # If no exposure meter threshold set, then OB can only be part of band 1 if band_number != 1: filtered_good_obs = filtered_good_obs[filtered_good_obs['exp_meter_threshold'] != -1.0] filtered_good_obs.reset_index(drop=True, inplace=True) # Compute nominal exposure times and increase exposure times for different bands slowdown_factors = {1: 1.0, 2: 2.0, 3: 4.0} slow = slowdown_factors[band_number] new_exptimes = kpfcc.recompute_exposure_times(filtered_good_obs, slow) filtered_good_obs['original_exptime'] = filtered_good_obs['exptime'] filtered_good_obs['exptime'] = new_exptimes filtered_good_obs.to_csv(os.path.join(savepath, request_file), index=False) # CAPTURE CUSTOM INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- custom_file = str(config.get('data', 'custom_file')) custom_frame = kpfcc.format_custom_csv(OBs) custom_frame.to_csv(os.path.join(savepath, custom_file), index=False) # CAPTURE FILLER REQUEST INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- # Now get the bright backup stars information from the filler program filler_file = str(config.get('data', 'filler_file')) if fillers is not None: print(f'Generating filler.csv from program: {fillers}') good_obs_backup, bad_obs_values_backup, bad_obs_hasFields_backup, bad_obs_count_by_semid_backup, bad_field_histogram_backup = kpfcc.get_request_sheet(OBs, [fillers], os.path.join(savepath, filler_file)) else: print(f'No fillers specified, creating blank filler.csv file.') good_obs_backup = pd.DataFrame(columns=good_obs.columns) filtered_good_obs_backup = kpfcc.filter_request_csv(good_obs_backup, band_number) filtered_good_obs_backup.to_csv(os.path.join(savepath, filler_file), index=False) # CAPTURE EMAIL INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- send_emails_with = [] for i in range(len(bad_obs_values)): if bad_obs_values['metadata.semid'][i] in awarded_programs: send_emails_with.append(kpfcc.inspect_row(bad_obs_hasFields, bad_obs_values, i)) ''' this is where code to automatically send emails will go. Not implemented yet. ''' else: print(f'User specified request source: {args.request_source}') print("No action taken on request.csv") print("User must also supply a custom.csv file and a filler.csv file.") # CAPTURE PAST HISTORY INFORMATION AND PROCESS # -------------------------------------------- # -------------------------------------------- past_source = args.past_source past_file = str(config.get('data', 'past_file')) if past_source == 'db': print(f'Pulling past history information from database') raw_history = kpfcc.pull_OB_histories(semester) utc_offset_hours = config.getfloat('global', 'UTCoffset', fallback=-10) obhist = hs.write_OB_histories_to_csv( raw_history, utc_offset_hours=utc_offset_hours, observatory=config.get('global', 'observatory'), ) obhist.to_csv(os.path.join(savepath, past_file), index=False) else: print(f'Using past history information from file: {past_source}') # Validate that the file has the correct columns required_columns = ['id', 'target', 'semid', 'timestamp', 'exposure_time', 'observer'] if os.path.exists(past_source): df = pd.read_csv(past_source, nrows=0) actual_columns = set(df.columns) missing_columns = set(required_columns) - actual_columns has_ut_start = ( 'exposure_start_time_UT' in actual_columns or 'exposure_start_time' in actual_columns ) if missing_columns or not has_ut_start: if not has_ut_start: missing_columns.add('exposure_start_time_UT or exposure_start_time') logging.warning(f"Past history file '{past_source}' is missing required columns: {missing_columns}") else: print(f"Past history file columns validated: all required columns present") else: logging.warning(f"Past history file '{past_source}' does not exist") return
[docs] def kpfcc_webapp(args): """ Launch web app to view interactive plots. Args: args (argparse.Namespace): the command line arguments with flags: -uptree_path (str): the path to the uptree directory below which the folder structure is <semester_code>/<date>/<band>/. Returns: None """ uptree_path = args.uptree_path app.launch_app(uptree_path) return
[docs] def plan_semester(args): """ Run the core optimization algorithm for determining what stars to observe on what nights. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. -run_band3 (bool): whether to run the band 3 filler program. Returns: None """ cf = args.config_file print(f'plan_semester function: config_file is {cf}') b3 = args.run_band3 print(f'plan_semester function: b3 is {b3}') semester_planner = splan.SemesterPlanner(cf, b3) semester_planner.run_model() return
[docs] def plan_night(args): """ Run the slew path optimization using the TTP package for a given night's selected targets. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. Returns: None """ cf = args.config_file print(f'plan_night function: config_file is {cf}') night_planner = nplan.NightPlanner(cf) did_run = night_planner.run_ttp() if did_run: night_planner.to_hdf5() return
[docs] def plot(args): """ Generate html and png files of the standard AstroQ output plots. Mirrors those produced by the webapp. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. Returns: None """ cf = args.config_file print(f'plot function: using config file from {cf}') config = ConfigParser() config.read(cf) semester_directory = config.get('global', 'workdir') if os.path.exists(os.path.join(semester_directory, 'outputs', 'semester_planner.h5')): semester_planner = splan.SemesterPlanner.from_hdf5(os.path.join(semester_directory, 'outputs', 'semester_planner.h5')) saveout = os.path.join(semester_planner.output_directory, "saved_plots") os.makedirs(saveout, exist_ok = True) data_astroq = pl.process_stars(semester_planner) all_stars_from_all_programs = np.concatenate(list(data_astroq[0].values())) # build the plots request_df = pl.get_request_frame(semester_planner, all_stars_from_all_programs) request_table_html = pl.dataframe_to_html(request_df) fig_cof = pl.get_cof(semester_planner, list(data_astroq[1].values())) fig_birdseye = pl.get_birdseye(semester_planner, data_astroq[2], list(data_astroq[1].values())) fig_football = pl.get_football(semester_planner, all_stars_from_all_programs, use_program_colors=True) fig_tau_inter_line = pl.get_tau_inter_line(semester_planner, all_stars_from_all_programs, use_program_colors=True) # write the html versions fig_cof_html = pio.to_html(fig_cof, full_html=True, include_plotlyjs='cdn') fig_birdseye_html = pio.to_html(fig_birdseye, full_html=True, include_plotlyjs='cdn') fig_football_html = pio.to_html(fig_football, full_html=True, include_plotlyjs='cdn') fig_tau_inter_line_html = pio.to_html(fig_tau_inter_line, full_html=True, include_plotlyjs='cdn') # write out the html files with open(os.path.join(saveout, "request_table.html"), "w") as f: f.write(request_table_html) with open(os.path.join(saveout, "all_programs_COF.html"), "w") as f: f.write(fig_cof_html) with open(os.path.join(saveout, "all_programs_birdseye.html"), "w") as f: f.write(fig_birdseye_html) with open(os.path.join(saveout, "all_programs_football.html"), "w") as f: f.write(fig_football_html) with open(os.path.join(saveout, "all_programs_tau_inter_line.html"), "w") as f: f.write(fig_tau_inter_line_html) else: print(f'No semester_planner.h5 found in {semester_directory}/outputs/. No plots will be generated.') night_planner_h5 = os.path.join(semester_directory, 'outputs', 'night_planner.h5') if os.path.exists(night_planner_h5): night_planner = nplan.NightPlanner.from_hdf5(night_planner_h5) data_ttp = night_planner.solution # Get the night start time from allocation file (this is "Minute 0") from astroq.nplan import get_nightly_times_from_allocation night_start_time, _ = get_nightly_times_from_allocation( night_planner.allocation_file, night_planner.current_day ) # build the plots script_table_df = pl.get_script_plan(night_planner) timebar_fig = pl.get_timebar(semester_planner, all_stars_from_all_programs, use_program_colors=False) ladder_fig = pl.get_ladder(data_ttp, night_start_time) slew_animation_fig = pl.get_slew_animation_plotly(data_ttp, os.path.join(semester_directory, "request.csv"), animationStep=120) slew_path_fig = pl.plot_path_2D_interactive(data_ttp, night_start_time=night_start_time) # write the html versions script_table_html = pl.dataframe_to_html(script_table_df) timebar_html = pio.to_html(timebar_fig, full_html=True, include_plotlyjs='cdn') ladder_html = pio.to_html(ladder_fig, full_html=True, include_plotlyjs='cdn') slew_path_html = pio.to_html(slew_path_fig, full_html=True, include_plotlyjs='cdn') slew_animation_html = pio.to_html(slew_animation_fig, full_html=True, include_plotlyjs='cdn') # write out the html files with open(os.path.join(saveout, "script_table.html"), "w") as f: f.write(script_table_html) with open(os.path.join(saveout, "timebar_plot.html"), "w") as f: f.write(timebar_html) with open(os.path.join(saveout, "ladder_plot.html"), "w") as f: f.write(ladder_html) with open(os.path.join(saveout, "slew_animation_plot.html"), "w") as f: f.write(slew_animation_html) with open(os.path.join(saveout, "slew_path_plot.html"), "w") as f: f.write(slew_path_html) else: print(f'No night_planner.pkl found in {semester_directory}/outputs/. No plots will be generated.') return
[docs] def requests_vs_schedule(args): """ Compare the request.csv file to the schedule.csv file to ensure that the schedule is valid. This is a sanity check to ensure that the schedule is not violating any of the constraints. Args: args (argparse.Namespace): the command line arguments with flags: -cf (str): the path to the config file. -schedule_file (str): the path to the schedule file. Returns: None """ cf = args.config_file sf = args.schedule_file print(f'requests_vs_schedule function: config_file is {cf}') print(f'requests_vs_schedule function: schedule_file is {sf}') # Create semester planner to get strategy data semester_planner = splan.SemesterPlanner(cf, run_band3=False) semester_planner.run_model() req = semester_planner.strategy sch = pd.read_csv(sf) sch = sch.sort_values(by=['d', 's']).reset_index(drop=True) # Re-order into the real schedule # First, ensure no repeated day/slot pairs (does allow missing pairs) no_duplicate_slot_err = ("'No duplicate slot' condition violated: " "At least one pair of rows corresponds to " "the same day and slot.") assert sch.groupby(['d','s']).size().max()<=1, no_duplicate_slot_err for star in req.unique_id: star_request = req.query(f"unique_id=='{star}'") star_schedule = sch.query(f"r=='{star}'") # Only the slots with the star listed # A star might not be scheduled at all. This does not violate constraints, but should be noted. if len(star_schedule)==0: print(f"{star} not scheduled" ) continue # 1) t_visit: No stars scheduled during another star's slot t_visit = star_request.t_visit.values[0] # Number of slots needed to complete observation star_inds = star_schedule.index day_slot = sch[['d', 's']] # Check the number of slots between consecutive obs. If they're on the same day, demand a minimum separation if star_inds.max() == day_slot.index.max(): # Special case: if this star includes the last obs in the whole schedule star_inds = star_inds[:-1] # Exclude the very last observation to avoid index err. That obs can't be overlapped by a later target anyway day_slot_diffs = day_slot.iloc[star_inds+1].reset_index() - day_slot.iloc[star_inds].reset_index() if len(day_slot_diffs.query('d==0'))==0: # d==0 when next obs is on the same day. If the target is always the last obs of the night, pass pass else: closest_slot_separation = day_slot_diffs.query('d==0').s.min() t_visit_err = ("t_visit violated: " "Two stars are scheduled too close together: " f"{star} requires {t_visit} slots but another " f"star is scheduled after only {closest_slot_separation}.") assert closest_slot_separation >= t_visit, t_visit_err # 2) n_inter_max: Total number of nights a target is scheduled in the semester is less than n_inter_max n_inter_max = star_request['n_inter_max'].values[0] n_inter_sch = len(set(star_schedule.d)) # All unique nights with scheduled obs # Now make sure the number of visits is less than the limit n_inter_max_err = ("n_inter_max violated: " f"{star} is scheduled too many times in the semester " f"(scheduled: {n_inter_sch} obs; required: {n_inter_max} obs)") assert n_inter_sch <= n_inter_max, n_inter_max_err # 3) n_intra_min, n_intra_max: N obs per day is between n_intra_min and n_intra_max # t_visit, the number of slots required to complete a single observation (aka visit) t_visit = req[req.unique_id==star].t_visit.values # Upper/lower limits on N obs per day n_intra_min, n_intra_max = star_request[['n_intra_min', 'n_intra_max']].values[0] # Scheduled min/max number of obs per day n_intra_groupby = star_schedule.groupby(['d']).size() # The numerator gives the sum of all starting slots in which the target is observed in a day. n_intra_min_sch, n_intra_max_sch = n_intra_groupby.min(), n_intra_groupby.max() # Ensure the target is never scheduled too few/many times in one night n_intra_min_err = ("n_intra_min violated: " f"{star} is scheduled too few times in one night " f"(scheduled: {n_intra_min_sch} obs; required: {n_intra_min} obs)") assert n_intra_min <= n_intra_min_sch, n_intra_min_err n_intra_max_err = ("n_intra_max violated: " f"{star} is scheduled too many times in one night " f"(scheduled: {n_intra_max_sch} obs; required: {n_intra_max} obs)") assert n_intra_max_sch <= n_intra_max, n_intra_max_err # 4) tau_inter: There must be at least tau_inter nights between successive nights during which a target is observed tau_inter = star_request['tau_inter'].values[0] # min num of nights before another obs if tau_inter > 0: # only run this test if the intention is to schedule more than once unique_days = np.sort(np.array(list(set(star_schedule.d)))) if len(unique_days) <= 1: # If only 1 obs or 1 day, no risk of spacing obs too closely pass else: min_day_gaps = np.min(unique_days[1:] - unique_days[:-1]) # Require that all gaps are greater than the min gap tau_inter_err = ("tau_inter violated: " f"two obs of {star} are not spaced by enough days " f"(scheduled: {min_day_gaps} days; required: {tau_inter} days)") assert min_day_gaps >= tau_inter, tau_inter_err # 5) tau_intra: There must be at least tau_intra slots between successive observations of a target in a single night slot_duration = semester_planner.slot_size # Slot duration in minutes slots_per_hour = 60/slot_duration tau_intra_slots = star_request['tau_intra'].values[0] # recall that the tau_intra is already in units of slots min_slot_diffs = star_schedule.groupby('d').s.diff().min() # Group by day, then find successive differences between slot numbers in the same day. Differences are not computed between the last slot of one night and the first slot of the next night (those values are NaN). The differences must all be AT LEAST tau_intra. if n_intra_max <= 1: # If only 1 obs per night, no risk of spacing obs too closely pass else: tau_intra_err = ("tau_intra_violated: " f"two obs of {star} are not spaced by enough slots " f"(scheduled: {min_slot_diffs} slots; required: {tau_intra_slots} slots)") assert min_slot_diffs >= tau_intra_slots, tau_intra_err