Source code for astroq.splan

"""
Module that defines the SemesterPlanner class. This class is responsible for defining, building, and solving the
Gurobi model for semester-level observation planning. It is nearly completely agnostic to all astronomy knowledge.

"""

# Standard library imports
import logging
import os
import time
import warnings
from configparser import ConfigParser
from datetime import datetime, timedelta
import pickle
import json
import h5py

# Third-party imports
import gurobipy as gp
from gurobipy import GRB
import numpy as np
import pandas as pd
from astropy.time import Time, TimeDelta
import astroplan as apl

# Local imports
import astroq.access as ac
import astroq.history as hs
import astroq.io as io

# Suppress warnings
warnings.filterwarnings('ignore')

logs = logging.getLogger(__name__)

[docs] class SemesterPlanner(object): """ Define the SemesterPlanner object. This is the heart of AstroQ. This object: - manages and holds the parameters defined in the config.ini - constructs additional metadata for easy sharing/storage across functions - builds the Gurobi model - defines the constraints - sets the objective function - kicks off the model solver - serializes the results to a csv file - saves the object to an hdf5 file for use later by the nplan and plot modules """ def __init__(self, cf, run_band3): """ Initialize the SemesterPlanner object. Args: cf (str): the path to the config.ini file run_band3 (bool): whether to run the band 3 weather loss model (this will be unnecessary in the 2026A semester) Returns: None """ logs.debug("Building the SemesterPlanner.") self.start_the_clock = time.time() # Read config file directly config = ConfigParser() config.read(cf) self.run_band3 = run_band3 self.config = config # Extract configuration parameters from new format workdir = str(config.get('global', 'workdir')) self.semester_directory = workdir self.current_day = str(config.get('global', 'current_day')) self.observatory = config.get('global', 'observatory') self.utc_offset_hours = config.getfloat('global', 'UTCoffset', fallback=-10) # Get semester parameters from semester section self.slot_size = config.getint('semester', 'slot_size') self.run_weather_loss = config.getboolean('semester', 'run_weather_loss') self.solve_time_limit = config.getint('semester', 'max_solve_time') self.gurobi_output = config.getboolean('semester', 'show_gurobi_output') self.solve_max_gap = config.getfloat('semester', 'max_solve_gap') self.max_bonus = config.getfloat('semester', 'maximum_bonus_size') self.run_bonus_round = config.getboolean('semester', 'run_bonus_round') self.semester_start_date = config.get('global', 'semester_start_day') semester_end_date = config.get('global', 'semester_end_day') start_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d') end_date = datetime.strptime(semester_end_date, '%Y-%m-%d') self.semester_length = int((end_date - start_date).days + 1) self.semester_letter = config.get('global', 'semester')[-1] self.throttle_grace = config.getfloat('semester', 'throttle_grace') self.hours_per_night = config.getfloat('semester', 'hours_per_night') # Output directory self.output_directory = os.path.join(workdir, "outputs") check = os.path.isdir(self.output_directory) if not check: os.makedirs(self.output_directory) # Set up file paths from data section programs_file_config = str(config.get('data', 'programs_file')) if os.path.isabs(programs_file_config): self.programs_file = programs_file_config else: self.programs_file = os.path.join(self.semester_directory, programs_file_config) allocation_file_config = str(config.get('data', 'allocation_file')) if os.path.isabs(allocation_file_config): self.allocation_file = allocation_file_config else: self.allocation_file = os.path.join(self.semester_directory, allocation_file_config) # Define the input files if self.run_band3: request_file_config = str(config.get('data', 'filler_file')) self.add_twilights() else: request_file_config = str(config.get('data', 'request_file')) if os.path.isabs(request_file_config): self.request_file = request_file_config else: self.request_file = os.path.join(self.semester_directory, request_file_config) past_file_config = str(config.get('data', 'past_file')) if os.path.isabs(past_file_config): self.past_file = past_file_config else: self.past_file = os.path.join(self.semester_directory, past_file_config) custom_file_config = str(config.get('data', 'custom_file')) if os.path.isabs(custom_file_config): self.custom_file = custom_file_config else: self.custom_file = os.path.join(self.semester_directory, custom_file_config) if not os.path.exists(self.request_file): raise FileNotFoundError(f"Requests file not found: {self.request_file}") self.requests_frame_all = pd.read_csv(self.request_file) # splan must only know about the active requests mask = self.requests_frame_all['inactive'] == False logs.warning(f"There are {len(self.requests_frame_all[~mask])} inactive of {len(self.requests_frame_all)} requests.") self.requests_frame = self.requests_frame_all[mask] self.requests_frame.reset_index(drop=True, inplace=True) # Data cleaning # Fill NaN values with defaults --- for now in early 2025B since we had issues with the webform. # Replace "None" strings with NaN first, then fill with defaults to make sure we get them all self.requests_frame['n_intra_max'] = self.requests_frame['n_intra_max'].replace('None', np.nan).fillna(1) self.requests_frame['n_intra_min'] = self.requests_frame['n_intra_min'].replace('None', np.nan).fillna(1) self.requests_frame['tau_intra'] = self.requests_frame['tau_intra'].replace('None', np.nan).fillna(0) # Handle weather band columns - process each band column individually for band_num in [1, 2, 3]: weather_band_col = f'weather_band_{band_num}' if weather_band_col in self.requests_frame.columns: self.requests_frame[weather_band_col] = self.requests_frame[weather_band_col].replace('None', np.nan).fillna(False) self.requests_frame['unique_id'] = self.requests_frame['unique_id'].astype(str) self.requests_frame['starname'] = self.requests_frame['starname'].astype(str) # Build the "strategy" dataframe. Note exptime is in minutes and tau_intra is in hours they are both converted to slots here strategy = self.requests_frame[['starname', 'unique_id', 'n_intra_min','n_intra_max','n_inter_max','tau_inter']] strategy['t_visit'] = (self.requests_frame['exptime'] / 60 / self.slot_size).clip(lower=1).round().astype(int) strategy['tau_intra'] = (self.requests_frame['tau_intra'] * 60 / self.slot_size).round().astype(int) self.strategy = strategy # Compile additional data and metadata self.past_history = hs.process_star_history( self.past_file, self.utc_offset_hours, observatory=self.observatory ) self.slots_needed_for_exposure_dict = self._build_slots_required_dictionary() self.all_dates_dict, self.all_dates_array = self._build_date_dictionary() self._calculate_slot_info() # Observability represents the indices of the slots where targets are observable self.observability = self._build_observability() self.observability_tuples = list(self.observability.itertuples(index=False, name=None)) # Joiner combines strategy and observability self.joiner = pd.merge(self.strategy, self.observability, on=['unique_id']) # add dummy columns for easier joins self.joiner['unique_id2'] = self.joiner['unique_id'] self.joiner['d2'] = self.joiner['d'] self.joiner['s2'] = self.joiner['s'] # Determine the nights where multi-visit requests are observable and the list of multi-visit requests self.observability_nights = self.joiner[self.joiner['n_intra_max'] > 1][['unique_id', 'd']].drop_duplicates().copy() self.multi_visit_requests = list(self.observability_nights['unique_id'].unique()) # Define subsets of requests self.all_requests = list(self.requests_frame['unique_id']) self.schedulable_requests = list(self.joiner['unique_id'].unique()) self.single_visit_requests = [item for item in self.schedulable_requests if item not in self.multi_visit_requests] warncount = 0 for starid in list(self.requests_frame['unique_id']): if starid not in self.schedulable_requests: starname = self.requests_frame[self.requests_frame['unique_id']==starid]['starname'].values[0] #logs.warning("Target " + starname + " with unique id " + starid + " has no valid day/slot pairs and therefore is effectively removed from the model.") warncount += 1 logs.warning("There are " + str(warncount) + " targets out of " + str(len(list(self.requests_frame['unique_id']))) + " that have no valid day/slot pairs and therefore are effectively removed from the model.") # Get each request's full list of valid d/s pairs self.all_valid_ds_for_request = self.joiner.groupby(['unique_id'])[['d', 's']].agg(list) # Get all requests which are valid in slot (d, s) requests_valid_for_ds = pd.merge( self.joiner.drop_duplicates(['d', 's']), self.joiner[['unique_id', 'd', 's']], suffixes=['', '3'], on=['d', 's']) self.requests_valid_for_ds = requests_valid_for_ds.groupby(['d','s'])[['unique_id3']].agg(list) # Get all valid d/s pairs self.valid_ds_pairs = self.joiner.copy() # Get all requests that require multiple slots to complete one observation self.multislot_mask = self.joiner.t_visit > 1 self.multi_slot_frame = self.joiner[self.multislot_mask] # Get all valid slots s for request r on day d valid_s_for_rd = pd.merge( self.joiner.drop_duplicates(['unique_id','d',]), self.joiner[['unique_id','d','s']], suffixes=['','3'],on=['unique_id'] ).query('d == d3') self.slots_on_day_for_r = valid_s_for_rd.groupby(['unique_id','d'])[['s3']].agg(list) # Get all request id's that are valid on a given day self.unique_request_on_day_pairs = self.joiner.copy().drop_duplicates(['unique_id','d']) # Define the Gurobi model self.model = gp.Model('Semester_Scheduler') # Yrds is technically a 1D matrix indexed by tuples. # But in practice best think of it as a 3D ragged matrix of requests r, nights d, and slots s, with gaps. # Day d / Slot s for request r will be 1 to indicate starting an exposure for that request in that day/slot observability_array = list(self.observability.itertuples(index=False, name=None)) self.Yrds = self.model.addVars(observability_array, vtype = GRB.BINARY, name = 'Requests_Slots') if len(self.observability_nights) != 0: # Wrd is technically a 1D matrix indexed by tuples. # But in practice best think of it as a 2D ragged matrix of requests r and nights d, with gaps. # Night d for request r will be 1 to indicate at least one exposure is scheduled for this night. # Note that Wrd is only valid for requests r which have at least 2 visits requested in the night. observability_array_onsky = list(self.observability_nights.itertuples(index=False, name=None)) self.Wrd = self.model.addVars(observability_array_onsky, vtype = GRB.BINARY, name = 'OnSky') # theta is the "shortfall" variable, continous in natural numbers. self.theta = self.model.addVars(self.all_requests, name = 'Shortfall') # Set the max allowed number of observations for each request based on the past history and the requested number of observations desired_max_obs_allowed_dict = {} absolute_max_obs_allowed_dict = {} past_nights_observed_dict = {} for name in self.all_requests: idx = self.requests_frame.index[self.requests_frame['unique_id']==name][0] if name in list(self.past_history.keys()): past_nights_observed = self.past_history[name].total_n_unique_nights else: past_nights_observed = 0 # Safety valve for if the target is over-observed for any reason # if past_nights_observed > self.requests_frame['n_inter_max'][idx] + \ # int(self.requests_frame['n_inter_max'][idx]*self.max_bonus): if past_nights_observed > self.requests_frame['n_inter_max'][idx]: desired_max_obs = past_nights_observed else: desired_max_obs = (self.requests_frame['n_inter_max'][idx] - past_nights_observed) absolute_max_obs = (self.requests_frame['n_inter_max'][idx] - past_nights_observed) \ + int(self.requests_frame['n_inter_max'][idx]*self.max_bonus) # second safety valve if past_nights_observed > absolute_max_obs: absolute_max_obs = past_nights_observed past_nights_observed_dict[name] = past_nights_observed desired_max_obs_allowed_dict[name] = desired_max_obs absolute_max_obs_allowed_dict[name] = absolute_max_obs self.desired_max_obs_allowed_dict = desired_max_obs_allowed_dict self.absolute_max_obs_allowed_dict = absolute_max_obs_allowed_dict self.past_nights_observed_dict = past_nights_observed_dict logs.debug("Initializing complete.")
[docs] def _build_date_dictionary(self): """ Construct useful data structures that are used throughout the semester planner. Returns: all_dates_dict (dict): a dictionary where keys are the dates in the semester and values are the day index all_dates_array (list): a list of the dates in the semester """ start_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d') end_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d') + timedelta(days=self.semester_length - 1) all_dates_dict = {} all_dates_array = [] current_date = start_date day_index = 0 while current_date <= end_date: date_str = current_date.strftime('%Y-%m-%d') all_dates_dict[date_str] = day_index all_dates_array.append(date_str) current_date += timedelta(days=1) day_index += 1 return all_dates_dict, all_dates_array
[docs] def _calculate_slot_info(self): """ Compute important numbers relating to the quantity of slots. Returns: None """ # Calculate slots per quarter and night self.n_slots_in_night = int(24 * 60 / self.slot_size) # Calculate remaining semester info self.n_nights_in_semester = len(self.all_dates_dict) - self.all_dates_dict[self.current_day] self.n_slots_in_semester = self.n_slots_in_night * self.n_nights_in_semester # Calculate today's starting positions self.today_starting_slot = self.all_dates_dict[self.current_day] * self.n_slots_in_night self.today_starting_night = self.all_dates_dict[self.current_day]
[docs] def _build_slots_required_dictionary(self, always_round_up_flag=False): """ Determine the number of slots required to complete for each visit of a given request. Returns: slots_needed_for_exposure_dict (dict): a dictionary where keys are the star names and values are the number of slots required for each exposure """ slots_needed_for_exposure_dict = {} for n, row in self.requests_frame.iterrows(): starid = row['unique_id'] exposure_time = float(row['exptime']*row['n_exp']) overhead = 45*float(row['n_exp'] - 1) + 180*float(row['n_intra_max']) if always_round_up_flag: slots_needed = int(np.ceil((exposure_time + overhead) / (self.slot_size * 60.0))) else: slots_needed = int(np.round((exposure_time + overhead) / (self.slot_size * 60.0))) if slots_needed < 1: slots_needed = 1 slots_needed_for_exposure_dict[starid] = slots_needed return slots_needed_for_exposure_dict
[docs] def _build_observability(self): """ Determine the indices of the slots where targets are observable using the Access object. Returns: observability (dict): a dictionary where keys are the star names and values are the indices of the slots where the target is observable """ # Create Access object with parameters from config # Use KPFCC-specific Access class for Keck Observatory, otherwise use base Access class if 'Keck' in self.observatory or 'keck' in self.observatory.lower(): from astroq.queue.kpfcc import Access_KPFCC AccessClass = Access_KPFCC else: AccessClass = ac.Access self.access_obj = AccessClass( semester_start_date=self.semester_start_date, semester_length=self.semester_length, n_nights_in_semester=self.n_nights_in_semester, today_starting_night=self.today_starting_night, current_day=self.current_day, all_dates_dict=self.all_dates_dict, all_dates_array=self.all_dates_array, slot_size=self.slot_size, slots_needed_for_exposure_dict=self.slots_needed_for_exposure_dict, custom_file=self.custom_file, allocation_file=self.allocation_file, past_history=self.past_history, output_directory=self.output_directory, run_weather_loss=self.run_weather_loss, run_band3=self.run_band3, observatory_string=self.observatory, request_frame=self.requests_frame ) # Store the full access record array for later use self.access_record = self.access_obj.produce_ultimate_map() observability = self.access_obj.observability(self.requests_frame, access=self.access_record) return observability
[docs] def _compute_slots_required_for_exposure(self, exposure_time, slot_size, always_round_up_flag): """ Compute the number of slots required for a given exposure time. Args: exposure_time: Exposure time in minutes slot_size: Slot size in minutes always_round_up_flag: If True, always round up to the next slot Returns: slots_needed (int): the number of slots required for the given exposure time """ time_per_slot = slot_size / 60.0 # Convert slot_size to hours if always_round_up_flag: return math.ceil(exposure_time / time_per_slot) else: return round(exposure_time / time_per_slot)
[docs] def constraint_throttle(self): """ Not described in Lubin et al. 2025. Ensure that no program is scheduled for more time than they bring to the queue, within a grace amount, decided by the observatory. There is one constraint per program. """ logs.info("Constraint: Throttling over-requested programs.") merged_df = self.requests_frame[['unique_id', 'program_code']].copy() program_frame = pd.read_csv(self.programs_file) # Convert past_history dict to DataFrame and merge past_df = pd.DataFrame([{**{'unique_id': k}, **v._asdict()} for k, v in self.past_history.items()], columns=['unique_id', 'name', 'date_last_observed', 'total_n_exposures', 'total_n_visits', 'total_n_unique_nights', 'total_open_shutter_time', 'n_obs_on_nights', 'n_visits_on_nights']) # Ensure unique_id is string type to match merged_df past_df['unique_id'] = past_df['unique_id'].astype(str) past_df['past_slots_used'] = past_df['total_n_exposures'] * past_df['unique_id'].map(self.slots_needed_for_exposure_dict).fillna(1) merged_df = merged_df.merge(past_df[['unique_id', 'past_slots_used']], on='unique_id', how='left') merged_df['past_slots_used'] = merged_df['past_slots_used'].fillna(0) # Merge with program_frame (program_code from requests matches 'program' from program_frame) merged_df = merged_df.merge( program_frame[['program', 'nights']], left_on='program_code', right_on='program', how='inner' ) # Use groupby to calculate past_used_slots per program past_used_slots_by_program = ( merged_df.groupby('program')['past_slots_used'] .sum() .to_dict() ) # Use groupby to create program_requests_map program_requests_map = ( merged_df.groupby('program')['unique_id'] .apply(set) .to_dict() ) # Iterate through programs to build Gurobi constraints for program in program_frame['program']: program_row = program_frame.loc[program_frame['program'] == program].iloc[0] awarded_time_slots = (program_row['nights'] * self.hours_per_night * 60) / self.slot_size awarded_time_slots_grace = int(awarded_time_slots * self.throttle_grace) max_slots_allowed_for_scheduling_program = gp.quicksum( self.Yrds[r, d, s] * self.slots_needed_for_exposure_dict[r] for r, d, s in self.observability_tuples if r in program_requests_map.get(program, set()) ) # Get past used slots for this program (default to 0 if not found) past_used_slots = past_used_slots_by_program.get(program, 0) # Graceful failure. If a program has already been over-observed, we don't want the model to be infeasible. if awarded_time_slots_grace < past_used_slots: logs.warning(f"Program {program} has already been over-observed. Setting award equal to past used.") logs.warning(f"Therefore, Program {program}, will not be scheduled for any additional observations.") awarded_time_slots_grace = past_used_slots lhs = awarded_time_slots_grace - past_used_slots rhs = max_slots_allowed_for_scheduling_program self.model.addConstr(lhs >= rhs, f'throttle_program_{program}')
[docs] def constraint_reserve_multislot_exposures(self): """ See Constraint 1 in Lubin et al. 2025. Reserve multiple time slots for exposures that require more than one time slot to complete, and ensure that no other observations are scheduled during these slots. """ logs.info("Constraint: Reserve slots for for multi-slot exposures.") max_t_visit = self.strategy.t_visit.max() # longest exposure time R_ds = self.observability.groupby(['d','s'])['unique_id'].apply(set).to_dict() R_geq_t_visit = {} # dictionary of requests where t_visit is greater than or equal to t_visit strategy = self.strategy for t_visit in range(1,max_t_visit+1): R_geq_t_visit[t_visit] = set(strategy[strategy.t_visit >= t_visit]['unique_id']) for d,s in self.observability.drop_duplicates(['d','s'])[['d','s']].itertuples(index=False, name=None): rhs = [] for delta in range(1,max_t_visit): s_shift = s - delta if (d,s_shift) in R_ds: rhs.extend(self.Yrds[r,d,s_shift] for r in R_ds[d,s_shift] & R_geq_t_visit[delta+1]) lhs = 1 - gp.quicksum(self.Yrds[r,d,s] for r in R_ds[d,s]) rhs = gp.quicksum(rhs) self.model.addConstr(lhs >= rhs, f'reserve_multislot_{d}d_{s}s')
[docs] def constraint_enforce_internight_cadence(self): """ See Constraint 3 in Lubin et al. 2025. Ensure that the minimum number of days pass between consecutive observations of a given target. If a target is scheduled for observation on a given date, prevent it from being scheduled again until the minimum number of days have passed. """ logs.info("Constraint: Enforce inter-night cadence.") # Get all (d',s') pairs for a request that must be zero if a (d,s) pair is selected # Ensure tau_inter is numeric before the query joiner_for_intercadence = self.joiner.copy() joiner_for_intercadence['tau_inter'] = pd.to_numeric(joiner_for_intercadence['tau_inter'], errors='coerce') intercadence = pd.merge( joiner_for_intercadence.drop_duplicates(['unique_id','d',]), joiner_for_intercadence[['unique_id','d','s']], suffixes=['','3'],on=['unique_id'] ).query('d + 0 < d3 < d + tau_inter') self.intercadence_tracker = intercadence.groupby(['unique_id','d'])[['d3','s3']].agg(list) # When inter-night cadence is 1, there will be no keys to constrain so skip # While the if/else statement would catch these, by shrinking the list here we do fewer # total steps in the loop. intercadence_valid_tuples = self.joiner.copy()[self.joiner['tau_inter'] > 1] # We don't want duplicate slots on day d because we only need this constraint once per day # With duplicates, the same constraint would be applied to (r, d, s) and (r, d, s+1) which # is superfluous since we are summing over tonight's slots intercadence_valid_tuples = intercadence_valid_tuples.drop_duplicates(subset=['unique_id', 'd']) for i, row in intercadence_valid_tuples.iterrows(): constrained_slots_tonight = np.array(self.slots_on_day_for_r.loc[(row.unique_id2, row.d2)][0]) # Get all slots for pair (r, d) where valid if (row.unique_id, row.d) in self.intercadence_tracker.index: slots_to_constrain_future = self.intercadence_tracker.loc[(row.unique_id2, row.d2)] ds_pairs = zip(list(np.array(slots_to_constrain_future.d3).flatten()), list(np.array(slots_to_constrain_future.s3).flatten())) lhs = gp.quicksum(self.Yrds[row.unique_id,row.d,s2] for s2 in constrained_slots_tonight)/row.n_intra_max rhs = 1 - (gp.quicksum(self.Yrds[row.unique_id,d3,s3] for d3, s3 in ds_pairs)) self.model.addConstr(lhs <= rhs, 'enforce_internight_cadence_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs] def constraint_fix_previous_objective(self, epsilon=0.03): """ Bonus round constraint: not featured in Lubin et al. 2025. This constraint ensures that the objective function value calculated during Round 2 be within a given tolerance of the Round 1 value. This constraint ensures that Round 2 result in only small changes to the optimal solution found in Round 1. """ logs.info("Constraint: Fixing the previous solution's objective value.") lhs = gp.quicksum(self.theta[name] for name in self.requests_frame['unique_id']) rhs = self.model.objval + epsilon self.model.addConstr(lhs <= rhs, 'fix_previous_objective')
[docs] def set_objective_maximize_slots_used(self): """ Bonus round constraint: not featured in Lubin et al. 2025. In Round 2, maximize the number of filled slots, i.e., slots during which an exposure occurs. """ logs.info("Objective: Maximize the number of slots used.") self.model.setObjective(gp.quicksum(self.slots_needed_for_exposure_dict[id]*self.Yrds[id,d,s] for id, d, s in self.observability_tuples), GRB.MAXIMIZE)
[docs] def set_objective_minimize_theta_time_normalized(self): """ See Equation 1 in Lubin et al. 2025. Minimize the total shortfall for the number of targets that receive their requested number of observations, weighted by the time needed to complete one observation. """ self.model.setObjective(gp.quicksum(self.theta[name]*self.slots_needed_for_exposure_dict[name] for name in self.schedulable_requests), GRB.MINIMIZE)
[docs] def constraint_build_theta_multivisit(self): """ See Equation 3 in Lubin et al. 2025. Definition of the "shortfall" matrix, Theta. The shortfall is defined for each target, giving for each target the difference between the number of requested nights for that target and the sum of the past and future scheduled observations of that target. """ logs.info("Constraint: Build theta variable") for starid in self.schedulable_requests: idx = self.requests_frame.index[self.requests_frame['unique_id']==starid][0] lhs1 = self.theta[starid] rhs1 = 0 self.model.addConstr(lhs1 >= rhs1, 'greater_than_zero_shortfall_' + str(starid)) # Get all (d,s) pairs for which this request is valid. all_d = list(set(list(self.all_valid_ds_for_request.loc[starid].d))) available = list(zip(list(self.all_valid_ds_for_request.loc[starid].d), list(self.all_valid_ds_for_request.loc[starid].s))) lhs2 = self.theta[starid] rhs2 = self.requests_frame['n_inter_max'][idx] - self.past_nights_observed_dict[starid] - (gp.quicksum(self.Yrds[starid, d, s] for d, s in available))/self.requests_frame['n_intra_max'][idx] self.model.addConstr(lhs2 >= rhs2, 'greater_than_nobs_shortfall_' + str(starid))
[docs] def constraint_set_max_desired_unique_nights_Wrd(self): """ See Constraint 2 in Lubin et al. 2025. Limit the number of observations scheduled for a given target to the maximum value provided by the PI. This constraint may later be relaxed if Round 2 of scheduling is invoked. """ logs.info("Constraint: Set desired maximum observations.") for name in self.multi_visit_requests: all_d = list(set(list(self.all_valid_ds_for_request.loc[name].d))) lhs1 = gp.quicksum(self.Wrd[name, d] for d in all_d) rhs1 = self.desired_max_obs_allowed_dict[name] self.model.addConstr(lhs1 <= rhs1, 'max_desired_unique_nights_for_request_' + str(name)) for name in self.single_visit_requests: available = list(zip(list(self.all_valid_ds_for_request.loc[name].d), list(self.all_valid_ds_for_request.loc[name].s))) lhs2 = gp.quicksum(self.Yrds[name, d, s] for d, s in available) rhs2 = self.desired_max_obs_allowed_dict[name] self.model.addConstr(lhs2 <= rhs2, 'max_desired_unique_nights_for_request_' + str(name))
[docs] def remove_constraint_set_max_desired_unique_nights_Wrd(self): """ Bonus round constraint: not featured in Lubin et al. 2025. Remove the maximum number of observations set by constraints_set_max_desired_unique_nights_Wrd. """ logs.info("Constraint: Removing previous maximum observations constraint.") for name in self.multi_visit_requests: rm_const = self.model.getConstrByName("max_desired_unique_nights_for_request_" + str(name)) self.model.remove(rm_const)
[docs] def constraint_set_max_absolute_unique_nights_Wrd(self): """ Bonus round constraint: not featured in Lubin et al. 2025. Set the maximum number of observations for a target to 150% of the original requested number. """ logs.info("Constraint: Set absolute maximum observations.") for name in self.multi_visit_requests: all_d = list(set(list(self.all_valid_ds_for_request.loc[name].d))) lhs = gp.quicksum(self.Wrd[name, d] for d in all_d) rhs = self.absolute_max_obs_allowed_dict[name] self.model.addConstr(lhs <= rhs, 'max_absolute_unique_nights_for_request_' + str(name))
[docs] def constraint_build_enforce_intranight_cadence(self): """ Constraint 4 in Lubin et al. 2025. Ensure that the minimum number of hours pass between consecutive observations of a given target on the same night. If a target is scheduled for observation at a given time, prevent it from being scheduled again until the minimum number of hours have passed. """ logs.info("Constraint: Enforce intra-night cadence.") # get all combos of slots that must be constrained if given slot is scheduled # # When intra-night cadence is 0, there will be no keys to constrain so skip intracadence_valid_tuples = self.joiner.copy()[self.joiner['n_intra_max'] > 1] intracadence_frame = pd.merge( intracadence_valid_tuples.drop_duplicates(['unique_id','d','s']), intracadence_valid_tuples[['unique_id','d','s']], suffixes=['','3'],on=['unique_id', 'd'] ).query('s + 0 < s3 < s + tau_intra') intracadence_frame = intracadence_frame.groupby(['unique_id','d','s'])[['s3']].agg(list) for i, row in intracadence_valid_tuples.iterrows(): if (row.unique_id, row.d, row.s) in intracadence_frame.index: # Get all slots tonight which are too soon after given slot for another visit slots_to_constrain_tonight_intra = list(intracadence_frame.loc[(row.unique_id, row.d, row.s)][0]) lhs = self.Yrds[row.unique_id,row.d,row.s] rhs = self.Wrd[row.unique_id, row.d] - (gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in slots_to_constrain_tonight_intra)) self.model.addConstr(lhs <= rhs, 'enforce_intranight_cadence_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs] def constraint_set_min_max_visits_per_night(self): """ See Constraint 5 in Lubin et al. 2025. Require that the number of scheduled visits to a target in a given night falls between the minimum and maximum values supplied by the PI. """ logs.info("Constraint: Bound minimum and maximum visits per night.") intracadence_frame_on_day = self.joiner.copy().drop_duplicates(subset=['unique_id', 'd']) grouped_s = self.joiner.copy().groupby(['unique_id', 'd'])['s'].unique().reset_index() grouped_s.set_index(['unique_id', 'd'], inplace=True) for i, row in intracadence_frame_on_day.iterrows(): all_valid_slots_tonight = list(grouped_s.loc[(row.unique_id, row.d)]['s']) if row.unique_id in self.multi_visit_requests: lhs1 = gp.quicksum(self.Yrds[row.unique_id, row.d,s3] for s3 in all_valid_slots_tonight) rhs1 = row.n_intra_max*self.Wrd[row.unique_id, row.d] self.model.addConstr(lhs1 <= rhs1, 'enforce_max_visits1_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s") lhs2 = gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in all_valid_slots_tonight) rhs2 = row.n_intra_min*self.Wrd[row.unique_id, row.d] self.model.addConstr(lhs2 >= rhs2, 'enforce_min_visits_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s") else: lhs3 = gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in all_valid_slots_tonight) rhs3 = row.n_intra_max self.model.addConstr(lhs3 <= rhs3, 'enforce_min_visits_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs] def optimize_model(self): """ Solve the Gurobi model. Returns: None """ logs.debug("Begin model solve.") t1 = time.time() self.model.params.TimeLimit = self.solve_time_limit self.model.Params.OutputFlag = self.gurobi_output # Allow stop at 5% gap to prevent from spending lots of time on marginally better solution self.model.params.MIPGap = self.solve_max_gap # More aggressive presolve gives better solution in shorter time self.model.params.Presolve = 2 #self.model.params.Presolve = 0 self.model.update() self.model.optimize() if self.model.Status == GRB.INFEASIBLE: logs.critical('Model remains infeasible. Searching for invalid constraints.') search = self.model.computeIIS() logs.critical("Printing bad constraints:") for c in self.model.getConstrs(): if c.IISConstr: logs.critical('%s' % c.ConstrName) for c in self.model.getGenConstrs(): if c.IISGenConstr: logs.critical('%s' % c.GenConstrName) else: logs.debug("Model Successfully Solved.") logs.info("Time to finish solver: {:.3f}".format(time.time()-t1))
[docs] def run_model(self): """ Construct and solve the Gurobi model. Returns: None """ self.round_info = 'Round1' self.build_model_round1() self.optimize_model() self.serialize_results_csv() if self.run_bonus_round: self.round_info = 'Round2' self.build_model_round2() self.optimize_model() self.serialize_results_csv() logs.info("Scheduling complete, clear skies!")
[docs] def build_model_round1(self): """ Implement the constraints and objective function for Round 1 as described in Lubin et al. 2025. Returns: None """ t1 = time.time() self.constraint_reserve_multislot_exposures() self.constraint_enforce_internight_cadence() self.constraint_set_max_desired_unique_nights_Wrd() self.constraint_build_enforce_intranight_cadence() self.constraint_set_min_max_visits_per_night() self.constraint_build_theta_multivisit() self.constraint_throttle() self.set_objective_minimize_theta_time_normalized() logs.info(f"Time to build constraints: {np.round(time.time()-t1,3):.3f}")
[docs] def build_model_round2(self): """ Implement the constraints and objective function for Round 2. Not described in Lubin et al. 2025. Returns: None """ t1 = time.time() self.remove_constraint_set_max_desired_unique_nights_Wrd() self.constraint_set_max_absolute_unique_nights_Wrd() self.constraint_fix_previous_objective() self.set_objective_maximize_slots_used() logs.info(f"Time to build constraints: {np.round(time.time()-t1,3):.3f}")
[docs] def serialize_results_csv(self): """ Serialize the results to a CSV file. Returns: None """ logs.debug("Building human readable schedule.") serialized_schedule = io.serialize_schedule(self.Yrds, self) self.serialized_schedule = serialized_schedule today_idx = self.all_dates_dict[self.current_day] selected = [k[0] for k, v in self.Yrds.items() if v.x > 0 and k[1] == today_idx] selected = list(set(selected)) selected_df = self.requests_frame[self.requests_frame['unique_id'].isin(selected)].copy() selected_df.to_csv(os.path.join(self.output_directory, 'request_selected.csv'), index=False) self.to_hdf5()
[docs] def to_hdf5(self, hdf5_path=None): """ Save the SemesterPlanner object to an HDF5 file. Args: hdf5_path (str, optional): Path to save the HDF5 file. If None, saves to output_directory/semester_planner.h5 """ if hdf5_path is None: hdf5_path = os.path.join(self.output_directory, 'semester_planner.h5') # Remove existing file if it exists if os.path.exists(hdf5_path): os.remove(hdf5_path) # Define serialization mappings # Format: (hdf5_key, attribute_name, data_type, conversion_func) # data_type: 'scalar', 'string', 'array', 'dict_json', 'dataframe', 'structured_array', 'list' # DataFrames (saved using pandas HDF5 support) dataframe_attrs = [ ('requests_frame', 'requests_frame', 'dataframe', None), ('requests_frame_all', 'requests_frame_all', 'dataframe', None), ('serialized_schedule', 'serialized_schedule', 'dataframe', None), ] # Scalar/string attributes scalar_attrs = [ ('current_day', 'current_day', 'string', None), ('semester_start_date', 'semester_start_date', 'string', None), ('semester_length', 'semester_length', 'scalar', None), ('semester_letter', 'semester_letter', 'string', None), ('slot_size', 'slot_size', 'scalar', None), ('n_slots_in_night', 'n_slots_in_night', 'scalar', None), ('n_nights_in_semester', 'n_nights_in_semester', 'scalar', None), ('n_slots_in_semester', 'n_slots_in_semester', 'scalar', None), ('today_starting_slot', 'today_starting_slot', 'scalar', None), ('today_starting_night', 'today_starting_night', 'scalar', None), ('run_band3', 'run_band3', 'scalar', None), ('observatory', 'observatory', 'string', None), ('output_directory', 'output_directory', 'string', None), ('run_weather_loss', 'run_weather_loss', 'scalar', None), ('solve_time_limit', 'solve_time_limit', 'scalar', None), ('gurobi_output', 'gurobi_output', 'scalar', None), ('solve_max_gap', 'solve_max_gap', 'scalar', None), ('max_bonus', 'max_bonus', 'scalar', None), ('run_bonus_round', 'run_bonus_round', 'scalar', None), ('semester_directory', 'semester_directory', 'string', None), ('custom_file', 'custom_file', 'string', None), ('allocation_file', 'allocation_file', 'string', None), ('throttle_grace', 'throttle_grace', 'scalar', None), ('hours_per_night', 'hours_per_night', 'scalar', None), ] # Dictionary attributes (saved as JSON) dict_attrs = [ ('all_dates_dict_json', 'all_dates_dict', 'dict_json', None), ('slots_needed_for_exposure_dict_json', 'slots_needed_for_exposure_dict', 'dict_json', None), ('past_nights_observed_dict_json', 'past_nights_observed_dict', 'dict_json', None), ('past_history_json', 'past_history', 'past_history_dict', None), ] # Array/list attributes array_attrs = [ ('all_dates_array', 'all_dates_array', 'list', None), ('access_record', 'access_record', 'structured_array', None), ] # Save DataFrames first for hdf5_key, attr_name, data_type, _ in dataframe_attrs: df = getattr(self, attr_name) df.to_hdf(hdf5_path, key=hdf5_key, mode='a', format='table') # Save other attributes using h5py with h5py.File(hdf5_path, 'a') as f: # Save scalar/string attributes for hdf5_key, attr_name, data_type, _ in scalar_attrs: value = getattr(self, attr_name) f.attrs[hdf5_key] = value # Save dictionary attributes for hdf5_key, attr_name, data_type, _ in dict_attrs: if data_type == 'past_history_dict': # Special handling for past_history (StarHistory namedtuples) past_history = getattr(self, attr_name) past_history_serialized = {} for star_id, star_hist in past_history.items(): past_history_serialized[star_id] = { 'name': star_hist.name, 'date_last_observed': star_hist.date_last_observed, 'total_n_exposures': star_hist.total_n_exposures, 'total_n_visits': star_hist.total_n_visits, 'total_n_unique_nights': star_hist.total_n_unique_nights, 'total_open_shutter_time': star_hist.total_open_shutter_time, 'n_obs_on_nights': star_hist.n_obs_on_nights, 'n_visits_on_nights': star_hist.n_visits_on_nights, 'exposure_start_times': getattr(star_hist, 'exposure_start_times', []), } f.attrs[hdf5_key] = json.dumps(past_history_serialized) else: # Regular dictionary value = getattr(self, attr_name) f.attrs[hdf5_key] = json.dumps(value) # Save array/list attributes for hdf5_key, attr_name, data_type, _ in array_attrs: if data_type == 'list': value = getattr(self, attr_name) f.create_dataset(hdf5_key, data=np.array(value, dtype='S')) elif data_type == 'structured_array': access_record = getattr(self, attr_name) # Save each field of the structured array for field_name in access_record.dtype.names: f.create_dataset(f'{hdf5_key}/{field_name}', data=access_record[field_name], compression='gzip') logs.info(f"SemesterPlanner saved to HDF5: {hdf5_path}") return hdf5_path
[docs] @classmethod def from_hdf5(cls, hdf5_path): """ Load a SemesterPlanner object from an HDF5 file. Args: hdf5_path (str): Path to the HDF5 file Returns: SemesterPlanner: Reconstructed SemesterPlanner object """ import json from astroq.history import StarHistory # Create a new instance without calling __init__ instance = cls.__new__(cls) # Define deserialization mappings (inverse of to_hdf5) # Format: (hdf5_key, attribute_name, data_type, conversion_func) # DataFrames (loaded using pandas HDF5 support) dataframe_attrs = [ ('requests_frame', 'requests_frame', 'dataframe', None), ('requests_frame_all', 'requests_frame_all', 'dataframe', None), ('serialized_schedule', 'serialized_schedule', 'dataframe', None), ] # Scalar/string attributes scalar_attrs = [ ('current_day', 'current_day', 'string', None), ('semester_start_date', 'semester_start_date', 'string', None), ('semester_length', 'semester_length', 'scalar', None), ('semester_letter', 'semester_letter', 'string', None), ('slot_size', 'slot_size', 'scalar', None), ('n_slots_in_night', 'n_slots_in_night', 'scalar', None), ('n_nights_in_semester', 'n_nights_in_semester', 'scalar', None), ('n_slots_in_semester', 'n_slots_in_semester', 'scalar', None), ('today_starting_slot', 'today_starting_slot', 'scalar', None), ('today_starting_night', 'today_starting_night', 'scalar', None), ('run_band3', 'run_band3', 'scalar', None), ('observatory', 'observatory', 'string', None), ('output_directory', 'output_directory', 'string', None), ('run_weather_loss', 'run_weather_loss', 'scalar', None), ('solve_time_limit', 'solve_time_limit', 'scalar', None), ('gurobi_output', 'gurobi_output', 'scalar', None), ('solve_max_gap', 'solve_max_gap', 'scalar', None), ('max_bonus', 'max_bonus', 'scalar', None), ('run_bonus_round', 'run_bonus_round', 'scalar', None), ('semester_directory', 'semester_directory', 'string', None), ('custom_file', 'custom_file', 'string', None), ('allocation_file', 'allocation_file', 'string', None), ] # Dictionary attributes (loaded from JSON) dict_attrs = [ ('all_dates_dict_json', 'all_dates_dict', 'dict_json', None), ('slots_needed_for_exposure_dict_json', 'slots_needed_for_exposure_dict', 'dict_json', None), ('past_nights_observed_dict_json', 'past_nights_observed_dict', 'dict_json', None), ('past_history_json', 'past_history', 'past_history_dict', None), ] # Array/list attributes array_attrs = [ ('all_dates_array', 'all_dates_array', 'list', None), ('access_record', 'access_record', 'structured_array', None), ] # Load DataFrames first for hdf5_key, attr_name, data_type, _ in dataframe_attrs: setattr(instance, attr_name, pd.read_hdf(hdf5_path, key=hdf5_key)) # Load other attributes from HDF5 with h5py.File(hdf5_path, 'r') as f: # Load scalar/string attributes for hdf5_key, attr_name, data_type, _ in scalar_attrs: setattr(instance, attr_name, f.attrs[hdf5_key]) # Optional: backwards compat for HDF5 files saved before these were stored instance.throttle_grace = float(f.attrs.get('throttle_grace', 1.25)) instance.hours_per_night = float(f.attrs.get('hours_per_night', 12.0)) # Load dictionary attributes for hdf5_key, attr_name, data_type, _ in dict_attrs: data = json.loads(f.attrs[hdf5_key]) if data_type == 'past_history_dict': # Special handling for past_history (reconstruct StarHistory namedtuples) past_history = {} for star_id, hist_data in data.items(): star_hist = StarHistory( name=hist_data['name'], date_last_observed=hist_data['date_last_observed'], total_n_exposures=hist_data['total_n_exposures'], total_n_visits=hist_data['total_n_visits'], total_n_unique_nights=hist_data['total_n_unique_nights'], total_open_shutter_time=hist_data['total_open_shutter_time'], n_obs_on_nights=hist_data['n_obs_on_nights'], n_visits_on_nights=hist_data['n_visits_on_nights'], exposure_start_times=hist_data.get('exposure_start_times', []) ) past_history[star_id] = star_hist setattr(instance, attr_name, past_history) else: # Regular dictionary setattr(instance, attr_name, data) # Load array/list attributes for hdf5_key, attr_name, data_type, _ in array_attrs: if data_type == 'list': data = f[hdf5_key][:] setattr(instance, attr_name, [d.decode('utf-8') if isinstance(d, bytes) else d for d in data]) elif data_type == 'structured_array': # Reconstruct structured array from saved fields field_names = list(f[hdf5_key].keys()) # Load all field data first field_data = {} for field_name in field_names: field_data[field_name] = f[f'{hdf5_key}/{field_name}'][:] # Determine the number of records (first dimension of first field) first_field = field_data[field_names[0]] n_records = first_field.shape[0] # Create dtype list with proper shapes for multidimensional fields dtype_list = [] for field_name in field_names: data = field_data[field_name] if data.ndim == 1: # 1D field: just use the dtype dtype_list.append((field_name, data.dtype)) else: # Multidimensional field: include shape (excluding first dimension) dtype_list.append((field_name, data.dtype, data.shape[1:])) # Create structured array and populate it struct_array = np.zeros(n_records, dtype=dtype_list) for field_name in field_names: struct_array[field_name] = field_data[field_name] # Convert to recarray so we can use dot notation setattr(instance, attr_name, struct_array.view(np.recarray)) # Recreate access_obj using the loaded parameters # Use KPFCC-specific Access class for Keck Observatory, otherwise use base Access class if 'Keck' in instance.observatory or 'keck' in instance.observatory.lower(): from astroq.queue.kpfcc import Access_KPFCC AccessClass = Access_KPFCC else: AccessClass = ac.Access instance.access_obj = AccessClass( semester_start_date=instance.semester_start_date, semester_length=instance.semester_length, n_nights_in_semester=instance.n_nights_in_semester, today_starting_night=instance.today_starting_night, current_day=instance.current_day, all_dates_dict=instance.all_dates_dict, all_dates_array=instance.all_dates_array, slot_size=instance.slot_size, slots_needed_for_exposure_dict=instance.slots_needed_for_exposure_dict, custom_file=instance.custom_file, allocation_file=instance.allocation_file, past_history=instance.past_history, output_directory=instance.output_directory, run_weather_loss=instance.run_weather_loss, run_band3=instance.run_band3, observatory_string=instance.observatory, request_frame=instance.requests_frame ) logs.info(f"SemesterPlanner loaded from HDF5: {hdf5_path}") return instance
[docs] def add_twilights(self): """Add 20-minute buffer to allocation times that match 12-degree twilight.""" observatory = self.config.get('global', 'observatory') keck = apl.Observer.at_site(observatory) allocation_df = pd.read_csv(self.allocation_file) for idx, row in allocation_df.iterrows(): # Get date from start time (first 10 chars = YYYY-MM-DD) date_str = str(row['start'])[:10] day = Time(date_str, format='iso', scale='utc') # Get 12-degree twilight times evening_12 = keck.twilight_evening_nautical(day, which='next') morning_12 = keck.twilight_morning_nautical(day, which='next') # Check if start time matches evening twilight (within 10 min) start_time = Time(row['start']) if abs(start_time - evening_12) <= TimeDelta(10, format='jd') / 1440: # 10 minutes adjusted_start = start_time - TimeDelta(20, format='jd') / 1440 allocation_df.loc[idx, 'start'] = adjusted_start.strftime('%Y-%m-%dT%H:%M') logs.info(f"Adjusted start time for {date_str}: subtracted 20 min") # Check if stop time matches morning twilight (within 10 min) stop_time = Time(row['stop']) if abs(stop_time - morning_12) <= TimeDelta(10, format='jd') / 1440: # 10 minutes adjusted_stop = stop_time + TimeDelta(20, format='jd') / 1440 allocation_df.loc[idx, 'stop'] = adjusted_stop.strftime('%Y-%m-%dT%H:%M') logs.info(f"Adjusted stop time for {date_str}: added 20 min") # Save updated allocation file allocation_df.to_csv(self.allocation_file, index=False) logs.info("Allocation file updated with twilight adjustments")