"""
Module that defines the SemesterPlanner class. This class is responsible for defining, building, and solving the
Gurobi model for semester-level observation planning. It is nearly completely agnostic to all astronomy knowledge.
"""
# Standard library imports
import logging
import os
import time
import warnings
from configparser import ConfigParser
from datetime import datetime, timedelta
import pickle
import json
import h5py
# Third-party imports
import gurobipy as gp
from gurobipy import GRB
import numpy as np
import pandas as pd
from astropy.time import Time, TimeDelta
import astroplan as apl
# Local imports
import astroq.access as ac
import astroq.history as hs
import astroq.io as io
# Suppress warnings
warnings.filterwarnings('ignore')
logs = logging.getLogger(__name__)
[docs]
class SemesterPlanner(object):
"""
Define the SemesterPlanner object. This is the heart of AstroQ.
This object:
- manages and holds the parameters defined in the config.ini
- constructs additional metadata for easy sharing/storage across functions
- builds the Gurobi model
- defines the constraints
- sets the objective function
- kicks off the model solver
- serializes the results to a csv file
- saves the object to an hdf5 file for use later by the nplan and plot modules
"""
def __init__(self, cf, run_band3):
"""
Initialize the SemesterPlanner object.
Args:
cf (str): the path to the config.ini file
run_band3 (bool): whether to run the band 3 weather loss model (this will be unnecessary in the 2026A semester)
Returns:
None
"""
logs.debug("Building the SemesterPlanner.")
self.start_the_clock = time.time()
# Read config file directly
config = ConfigParser()
config.read(cf)
self.run_band3 = run_band3
self.config = config
# Extract configuration parameters from new format
workdir = str(config.get('global', 'workdir'))
self.semester_directory = workdir
self.current_day = str(config.get('global', 'current_day'))
self.observatory = config.get('global', 'observatory')
self.utc_offset_hours = config.getfloat('global', 'UTCoffset', fallback=-10)
# Get semester parameters from semester section
self.slot_size = config.getint('semester', 'slot_size')
self.run_weather_loss = config.getboolean('semester', 'run_weather_loss')
self.solve_time_limit = config.getint('semester', 'max_solve_time')
self.gurobi_output = config.getboolean('semester', 'show_gurobi_output')
self.solve_max_gap = config.getfloat('semester', 'max_solve_gap')
self.max_bonus = config.getfloat('semester', 'maximum_bonus_size')
self.run_bonus_round = config.getboolean('semester', 'run_bonus_round')
self.semester_start_date = config.get('global', 'semester_start_day')
semester_end_date = config.get('global', 'semester_end_day')
start_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d')
end_date = datetime.strptime(semester_end_date, '%Y-%m-%d')
self.semester_length = int((end_date - start_date).days + 1)
self.semester_letter = config.get('global', 'semester')[-1]
self.throttle_grace = config.getfloat('semester', 'throttle_grace')
self.hours_per_night = config.getfloat('semester', 'hours_per_night')
# Output directory
self.output_directory = os.path.join(workdir, "outputs")
check = os.path.isdir(self.output_directory)
if not check:
os.makedirs(self.output_directory)
# Set up file paths from data section
programs_file_config = str(config.get('data', 'programs_file'))
if os.path.isabs(programs_file_config):
self.programs_file = programs_file_config
else:
self.programs_file = os.path.join(self.semester_directory, programs_file_config)
allocation_file_config = str(config.get('data', 'allocation_file'))
if os.path.isabs(allocation_file_config):
self.allocation_file = allocation_file_config
else:
self.allocation_file = os.path.join(self.semester_directory, allocation_file_config)
# Define the input files
if self.run_band3:
request_file_config = str(config.get('data', 'filler_file'))
self.add_twilights()
else:
request_file_config = str(config.get('data', 'request_file'))
if os.path.isabs(request_file_config):
self.request_file = request_file_config
else:
self.request_file = os.path.join(self.semester_directory, request_file_config)
past_file_config = str(config.get('data', 'past_file'))
if os.path.isabs(past_file_config):
self.past_file = past_file_config
else:
self.past_file = os.path.join(self.semester_directory, past_file_config)
custom_file_config = str(config.get('data', 'custom_file'))
if os.path.isabs(custom_file_config):
self.custom_file = custom_file_config
else:
self.custom_file = os.path.join(self.semester_directory, custom_file_config)
if not os.path.exists(self.request_file):
raise FileNotFoundError(f"Requests file not found: {self.request_file}")
self.requests_frame_all = pd.read_csv(self.request_file)
# splan must only know about the active requests
mask = self.requests_frame_all['inactive'] == False
logs.warning(f"There are {len(self.requests_frame_all[~mask])} inactive of {len(self.requests_frame_all)} requests.")
self.requests_frame = self.requests_frame_all[mask]
self.requests_frame.reset_index(drop=True, inplace=True)
# Data cleaning
# Fill NaN values with defaults --- for now in early 2025B since we had issues with the webform.
# Replace "None" strings with NaN first, then fill with defaults to make sure we get them all
self.requests_frame['n_intra_max'] = self.requests_frame['n_intra_max'].replace('None', np.nan).fillna(1)
self.requests_frame['n_intra_min'] = self.requests_frame['n_intra_min'].replace('None', np.nan).fillna(1)
self.requests_frame['tau_intra'] = self.requests_frame['tau_intra'].replace('None', np.nan).fillna(0)
# Handle weather band columns - process each band column individually
for band_num in [1, 2, 3]:
weather_band_col = f'weather_band_{band_num}'
if weather_band_col in self.requests_frame.columns:
self.requests_frame[weather_band_col] = self.requests_frame[weather_band_col].replace('None', np.nan).fillna(False)
self.requests_frame['unique_id'] = self.requests_frame['unique_id'].astype(str)
self.requests_frame['starname'] = self.requests_frame['starname'].astype(str)
# Build the "strategy" dataframe. Note exptime is in minutes and tau_intra is in hours they are both converted to slots here
strategy = self.requests_frame[['starname', 'unique_id', 'n_intra_min','n_intra_max','n_inter_max','tau_inter']]
strategy['t_visit'] = (self.requests_frame['exptime'] / 60 / self.slot_size).clip(lower=1).round().astype(int)
strategy['tau_intra'] = (self.requests_frame['tau_intra'] * 60 / self.slot_size).round().astype(int)
self.strategy = strategy
# Compile additional data and metadata
self.past_history = hs.process_star_history(
self.past_file, self.utc_offset_hours, observatory=self.observatory
)
self.slots_needed_for_exposure_dict = self._build_slots_required_dictionary()
self.all_dates_dict, self.all_dates_array = self._build_date_dictionary()
self._calculate_slot_info()
# Observability represents the indices of the slots where targets are observable
self.observability = self._build_observability()
self.observability_tuples = list(self.observability.itertuples(index=False, name=None))
# Joiner combines strategy and observability
self.joiner = pd.merge(self.strategy, self.observability, on=['unique_id'])
# add dummy columns for easier joins
self.joiner['unique_id2'] = self.joiner['unique_id']
self.joiner['d2'] = self.joiner['d']
self.joiner['s2'] = self.joiner['s']
# Determine the nights where multi-visit requests are observable and the list of multi-visit requests
self.observability_nights = self.joiner[self.joiner['n_intra_max'] > 1][['unique_id', 'd']].drop_duplicates().copy()
self.multi_visit_requests = list(self.observability_nights['unique_id'].unique())
# Define subsets of requests
self.all_requests = list(self.requests_frame['unique_id'])
self.schedulable_requests = list(self.joiner['unique_id'].unique())
self.single_visit_requests = [item for item in self.schedulable_requests if item not in self.multi_visit_requests]
warncount = 0
for starid in list(self.requests_frame['unique_id']):
if starid not in self.schedulable_requests:
starname = self.requests_frame[self.requests_frame['unique_id']==starid]['starname'].values[0]
#logs.warning("Target " + starname + " with unique id " + starid + " has no valid day/slot pairs and therefore is effectively removed from the model.")
warncount += 1
logs.warning("There are " + str(warncount) + " targets out of " + str(len(list(self.requests_frame['unique_id']))) + " that have no valid day/slot pairs and therefore are effectively removed from the model.")
# Get each request's full list of valid d/s pairs
self.all_valid_ds_for_request = self.joiner.groupby(['unique_id'])[['d', 's']].agg(list)
# Get all requests which are valid in slot (d, s)
requests_valid_for_ds = pd.merge(
self.joiner.drop_duplicates(['d', 's']),
self.joiner[['unique_id', 'd', 's']],
suffixes=['', '3'],
on=['d', 's'])
self.requests_valid_for_ds = requests_valid_for_ds.groupby(['d','s'])[['unique_id3']].agg(list)
# Get all valid d/s pairs
self.valid_ds_pairs = self.joiner.copy()
# Get all requests that require multiple slots to complete one observation
self.multislot_mask = self.joiner.t_visit > 1
self.multi_slot_frame = self.joiner[self.multislot_mask]
# Get all valid slots s for request r on day d
valid_s_for_rd = pd.merge(
self.joiner.drop_duplicates(['unique_id','d',]),
self.joiner[['unique_id','d','s']],
suffixes=['','3'],on=['unique_id']
).query('d == d3')
self.slots_on_day_for_r = valid_s_for_rd.groupby(['unique_id','d'])[['s3']].agg(list)
# Get all request id's that are valid on a given day
self.unique_request_on_day_pairs = self.joiner.copy().drop_duplicates(['unique_id','d'])
# Define the Gurobi model
self.model = gp.Model('Semester_Scheduler')
# Yrds is technically a 1D matrix indexed by tuples.
# But in practice best think of it as a 3D ragged matrix of requests r, nights d, and slots s, with gaps.
# Day d / Slot s for request r will be 1 to indicate starting an exposure for that request in that day/slot
observability_array = list(self.observability.itertuples(index=False, name=None))
self.Yrds = self.model.addVars(observability_array, vtype = GRB.BINARY, name = 'Requests_Slots')
if len(self.observability_nights) != 0:
# Wrd is technically a 1D matrix indexed by tuples.
# But in practice best think of it as a 2D ragged matrix of requests r and nights d, with gaps.
# Night d for request r will be 1 to indicate at least one exposure is scheduled for this night.
# Note that Wrd is only valid for requests r which have at least 2 visits requested in the night.
observability_array_onsky = list(self.observability_nights.itertuples(index=False, name=None))
self.Wrd = self.model.addVars(observability_array_onsky, vtype = GRB.BINARY, name = 'OnSky')
# theta is the "shortfall" variable, continous in natural numbers.
self.theta = self.model.addVars(self.all_requests, name = 'Shortfall')
# Set the max allowed number of observations for each request based on the past history and the requested number of observations
desired_max_obs_allowed_dict = {}
absolute_max_obs_allowed_dict = {}
past_nights_observed_dict = {}
for name in self.all_requests:
idx = self.requests_frame.index[self.requests_frame['unique_id']==name][0]
if name in list(self.past_history.keys()):
past_nights_observed = self.past_history[name].total_n_unique_nights
else:
past_nights_observed = 0
# Safety valve for if the target is over-observed for any reason
# if past_nights_observed > self.requests_frame['n_inter_max'][idx] + \
# int(self.requests_frame['n_inter_max'][idx]*self.max_bonus):
if past_nights_observed > self.requests_frame['n_inter_max'][idx]:
desired_max_obs = past_nights_observed
else:
desired_max_obs = (self.requests_frame['n_inter_max'][idx] - past_nights_observed)
absolute_max_obs = (self.requests_frame['n_inter_max'][idx] - past_nights_observed) \
+ int(self.requests_frame['n_inter_max'][idx]*self.max_bonus)
# second safety valve
if past_nights_observed > absolute_max_obs:
absolute_max_obs = past_nights_observed
past_nights_observed_dict[name] = past_nights_observed
desired_max_obs_allowed_dict[name] = desired_max_obs
absolute_max_obs_allowed_dict[name] = absolute_max_obs
self.desired_max_obs_allowed_dict = desired_max_obs_allowed_dict
self.absolute_max_obs_allowed_dict = absolute_max_obs_allowed_dict
self.past_nights_observed_dict = past_nights_observed_dict
logs.debug("Initializing complete.")
[docs]
def _build_date_dictionary(self):
"""
Construct useful data structures that are used throughout the semester planner.
Returns:
all_dates_dict (dict): a dictionary where keys are the dates in the semester and values are the day index
all_dates_array (list): a list of the dates in the semester
"""
start_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d')
end_date = datetime.strptime(self.semester_start_date, '%Y-%m-%d') + timedelta(days=self.semester_length - 1)
all_dates_dict = {}
all_dates_array = []
current_date = start_date
day_index = 0
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
all_dates_dict[date_str] = day_index
all_dates_array.append(date_str)
current_date += timedelta(days=1)
day_index += 1
return all_dates_dict, all_dates_array
[docs]
def _calculate_slot_info(self):
"""
Compute important numbers relating to the quantity of slots.
Returns:
None
"""
# Calculate slots per quarter and night
self.n_slots_in_night = int(24 * 60 / self.slot_size)
# Calculate remaining semester info
self.n_nights_in_semester = len(self.all_dates_dict) - self.all_dates_dict[self.current_day]
self.n_slots_in_semester = self.n_slots_in_night * self.n_nights_in_semester
# Calculate today's starting positions
self.today_starting_slot = self.all_dates_dict[self.current_day] * self.n_slots_in_night
self.today_starting_night = self.all_dates_dict[self.current_day]
[docs]
def _build_slots_required_dictionary(self, always_round_up_flag=False):
"""
Determine the number of slots required to complete for each visit of a given request.
Returns:
slots_needed_for_exposure_dict (dict): a dictionary where keys are the star names and values are the number of slots required for each exposure
"""
slots_needed_for_exposure_dict = {}
for n, row in self.requests_frame.iterrows():
starid = row['unique_id']
exposure_time = float(row['exptime']*row['n_exp'])
overhead = 45*float(row['n_exp'] - 1) + 180*float(row['n_intra_max'])
if always_round_up_flag:
slots_needed = int(np.ceil((exposure_time + overhead) / (self.slot_size * 60.0)))
else:
slots_needed = int(np.round((exposure_time + overhead) / (self.slot_size * 60.0)))
if slots_needed < 1:
slots_needed = 1
slots_needed_for_exposure_dict[starid] = slots_needed
return slots_needed_for_exposure_dict
[docs]
def _build_observability(self):
"""
Determine the indices of the slots where targets are observable using the Access object.
Returns:
observability (dict): a dictionary where keys are the star names and values are the indices of the slots where the target is observable
"""
# Create Access object with parameters from config
# Use KPFCC-specific Access class for Keck Observatory, otherwise use base Access class
if 'Keck' in self.observatory or 'keck' in self.observatory.lower():
from astroq.queue.kpfcc import Access_KPFCC
AccessClass = Access_KPFCC
else:
AccessClass = ac.Access
self.access_obj = AccessClass(
semester_start_date=self.semester_start_date,
semester_length=self.semester_length,
n_nights_in_semester=self.n_nights_in_semester,
today_starting_night=self.today_starting_night,
current_day=self.current_day,
all_dates_dict=self.all_dates_dict,
all_dates_array=self.all_dates_array,
slot_size=self.slot_size,
slots_needed_for_exposure_dict=self.slots_needed_for_exposure_dict,
custom_file=self.custom_file,
allocation_file=self.allocation_file,
past_history=self.past_history,
output_directory=self.output_directory,
run_weather_loss=self.run_weather_loss,
run_band3=self.run_band3,
observatory_string=self.observatory,
request_frame=self.requests_frame
)
# Store the full access record array for later use
self.access_record = self.access_obj.produce_ultimate_map()
observability = self.access_obj.observability(self.requests_frame, access=self.access_record)
return observability
[docs]
def _compute_slots_required_for_exposure(self, exposure_time, slot_size, always_round_up_flag):
"""
Compute the number of slots required for a given exposure time.
Args:
exposure_time: Exposure time in minutes
slot_size: Slot size in minutes
always_round_up_flag: If True, always round up to the next slot
Returns:
slots_needed (int): the number of slots required for the given exposure time
"""
time_per_slot = slot_size / 60.0 # Convert slot_size to hours
if always_round_up_flag:
return math.ceil(exposure_time / time_per_slot)
else:
return round(exposure_time / time_per_slot)
[docs]
def constraint_throttle(self):
"""
Not described in Lubin et al. 2025.
Ensure that no program is scheduled for more time than they bring to the queue,
within a grace amount, decided by the observatory.
There is one constraint per program.
"""
logs.info("Constraint: Throttling over-requested programs.")
merged_df = self.requests_frame[['unique_id', 'program_code']].copy()
program_frame = pd.read_csv(self.programs_file)
# Convert past_history dict to DataFrame and merge
past_df = pd.DataFrame([{**{'unique_id': k}, **v._asdict()} for k, v in self.past_history.items()],
columns=['unique_id', 'name', 'date_last_observed', 'total_n_exposures',
'total_n_visits', 'total_n_unique_nights', 'total_open_shutter_time',
'n_obs_on_nights', 'n_visits_on_nights'])
# Ensure unique_id is string type to match merged_df
past_df['unique_id'] = past_df['unique_id'].astype(str)
past_df['past_slots_used'] = past_df['total_n_exposures'] * past_df['unique_id'].map(self.slots_needed_for_exposure_dict).fillna(1)
merged_df = merged_df.merge(past_df[['unique_id', 'past_slots_used']], on='unique_id', how='left')
merged_df['past_slots_used'] = merged_df['past_slots_used'].fillna(0)
# Merge with program_frame (program_code from requests matches 'program' from program_frame)
merged_df = merged_df.merge(
program_frame[['program', 'nights']],
left_on='program_code',
right_on='program',
how='inner'
)
# Use groupby to calculate past_used_slots per program
past_used_slots_by_program = (
merged_df.groupby('program')['past_slots_used']
.sum()
.to_dict()
)
# Use groupby to create program_requests_map
program_requests_map = (
merged_df.groupby('program')['unique_id']
.apply(set)
.to_dict()
)
# Iterate through programs to build Gurobi constraints
for program in program_frame['program']:
program_row = program_frame.loc[program_frame['program'] == program].iloc[0]
awarded_time_slots = (program_row['nights'] * self.hours_per_night * 60) / self.slot_size
awarded_time_slots_grace = int(awarded_time_slots * self.throttle_grace)
max_slots_allowed_for_scheduling_program = gp.quicksum(
self.Yrds[r, d, s] * self.slots_needed_for_exposure_dict[r]
for r, d, s in self.observability_tuples
if r in program_requests_map.get(program, set())
)
# Get past used slots for this program (default to 0 if not found)
past_used_slots = past_used_slots_by_program.get(program, 0)
# Graceful failure. If a program has already been over-observed, we don't want the model to be infeasible.
if awarded_time_slots_grace < past_used_slots:
logs.warning(f"Program {program} has already been over-observed. Setting award equal to past used.")
logs.warning(f"Therefore, Program {program}, will not be scheduled for any additional observations.")
awarded_time_slots_grace = past_used_slots
lhs = awarded_time_slots_grace - past_used_slots
rhs = max_slots_allowed_for_scheduling_program
self.model.addConstr(lhs >= rhs, f'throttle_program_{program}')
[docs]
def constraint_reserve_multislot_exposures(self):
"""
See Constraint 1 in Lubin et al. 2025.
Reserve multiple time slots for exposures that require
more than one time slot to complete, and ensure that
no other observations are scheduled during these slots.
"""
logs.info("Constraint: Reserve slots for for multi-slot exposures.")
max_t_visit = self.strategy.t_visit.max() # longest exposure time
R_ds = self.observability.groupby(['d','s'])['unique_id'].apply(set).to_dict()
R_geq_t_visit = {} # dictionary of requests where t_visit is greater than or equal to t_visit
strategy = self.strategy
for t_visit in range(1,max_t_visit+1):
R_geq_t_visit[t_visit] = set(strategy[strategy.t_visit >= t_visit]['unique_id'])
for d,s in self.observability.drop_duplicates(['d','s'])[['d','s']].itertuples(index=False, name=None):
rhs = []
for delta in range(1,max_t_visit):
s_shift = s - delta
if (d,s_shift) in R_ds:
rhs.extend(self.Yrds[r,d,s_shift] for r in R_ds[d,s_shift] & R_geq_t_visit[delta+1])
lhs = 1 - gp.quicksum(self.Yrds[r,d,s] for r in R_ds[d,s])
rhs = gp.quicksum(rhs)
self.model.addConstr(lhs >= rhs, f'reserve_multislot_{d}d_{s}s')
[docs]
def constraint_enforce_internight_cadence(self):
"""
See Constraint 3 in Lubin et al. 2025.
Ensure that the minimum number of days pass between
consecutive observations of a given target. If a
target is scheduled for observation on a given date,
prevent it from being scheduled again until the
minimum number of days have passed.
"""
logs.info("Constraint: Enforce inter-night cadence.")
# Get all (d',s') pairs for a request that must be zero if a (d,s) pair is selected
# Ensure tau_inter is numeric before the query
joiner_for_intercadence = self.joiner.copy()
joiner_for_intercadence['tau_inter'] = pd.to_numeric(joiner_for_intercadence['tau_inter'], errors='coerce')
intercadence = pd.merge(
joiner_for_intercadence.drop_duplicates(['unique_id','d',]),
joiner_for_intercadence[['unique_id','d','s']],
suffixes=['','3'],on=['unique_id']
).query('d + 0 < d3 < d + tau_inter')
self.intercadence_tracker = intercadence.groupby(['unique_id','d'])[['d3','s3']].agg(list)
# When inter-night cadence is 1, there will be no keys to constrain so skip
# While the if/else statement would catch these, by shrinking the list here we do fewer
# total steps in the loop.
intercadence_valid_tuples = self.joiner.copy()[self.joiner['tau_inter'] > 1]
# We don't want duplicate slots on day d because we only need this constraint once per day
# With duplicates, the same constraint would be applied to (r, d, s) and (r, d, s+1) which
# is superfluous since we are summing over tonight's slots
intercadence_valid_tuples = intercadence_valid_tuples.drop_duplicates(subset=['unique_id', 'd'])
for i, row in intercadence_valid_tuples.iterrows():
constrained_slots_tonight = np.array(self.slots_on_day_for_r.loc[(row.unique_id2, row.d2)][0])
# Get all slots for pair (r, d) where valid
if (row.unique_id, row.d) in self.intercadence_tracker.index:
slots_to_constrain_future = self.intercadence_tracker.loc[(row.unique_id2, row.d2)]
ds_pairs = zip(list(np.array(slots_to_constrain_future.d3).flatten()), list(np.array(slots_to_constrain_future.s3).flatten()))
lhs = gp.quicksum(self.Yrds[row.unique_id,row.d,s2] for s2 in constrained_slots_tonight)/row.n_intra_max
rhs = 1 - (gp.quicksum(self.Yrds[row.unique_id,d3,s3] for d3, s3 in ds_pairs))
self.model.addConstr(lhs <= rhs, 'enforce_internight_cadence_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs]
def constraint_fix_previous_objective(self, epsilon=0.03):
"""
Bonus round constraint: not featured in Lubin et al. 2025.
This constraint ensures that the
objective function value calculated during
Round 2 be within a given tolerance of the
Round 1 value. This constraint ensures that
Round 2 result in only small changes to the
optimal solution found in Round 1.
"""
logs.info("Constraint: Fixing the previous solution's objective value.")
lhs = gp.quicksum(self.theta[name] for name in self.requests_frame['unique_id'])
rhs = self.model.objval + epsilon
self.model.addConstr(lhs <= rhs, 'fix_previous_objective')
[docs]
def set_objective_maximize_slots_used(self):
"""
Bonus round constraint: not featured in Lubin et al. 2025.
In Round 2, maximize the number of filled slots,
i.e., slots during which an exposure occurs.
"""
logs.info("Objective: Maximize the number of slots used.")
self.model.setObjective(gp.quicksum(self.slots_needed_for_exposure_dict[id]*self.Yrds[id,d,s]
for id, d, s in self.observability_tuples), GRB.MAXIMIZE)
[docs]
def set_objective_minimize_theta_time_normalized(self):
"""
See Equation 1 in Lubin et al. 2025.
Minimize the total shortfall for the number
of targets that receive their requested number
of observations, weighted by the time needed
to complete one observation.
"""
self.model.setObjective(gp.quicksum(self.theta[name]*self.slots_needed_for_exposure_dict[name] for name in self.schedulable_requests), GRB.MINIMIZE)
[docs]
def constraint_build_theta_multivisit(self):
"""
See Equation 3 in Lubin et al. 2025.
Definition of the "shortfall" matrix, Theta.
The shortfall is defined for each target,
giving for each target the difference between
the number of requested nights for that target
and the sum of the past and future scheduled
observations of that target.
"""
logs.info("Constraint: Build theta variable")
for starid in self.schedulable_requests:
idx = self.requests_frame.index[self.requests_frame['unique_id']==starid][0]
lhs1 = self.theta[starid]
rhs1 = 0
self.model.addConstr(lhs1 >= rhs1, 'greater_than_zero_shortfall_' + str(starid))
# Get all (d,s) pairs for which this request is valid.
all_d = list(set(list(self.all_valid_ds_for_request.loc[starid].d)))
available = list(zip(list(self.all_valid_ds_for_request.loc[starid].d), list(self.all_valid_ds_for_request.loc[starid].s)))
lhs2 = self.theta[starid]
rhs2 = self.requests_frame['n_inter_max'][idx] - self.past_nights_observed_dict[starid] - (gp.quicksum(self.Yrds[starid, d, s] for d, s in available))/self.requests_frame['n_intra_max'][idx]
self.model.addConstr(lhs2 >= rhs2, 'greater_than_nobs_shortfall_' + str(starid))
[docs]
def constraint_set_max_desired_unique_nights_Wrd(self):
"""
See Constraint 2 in Lubin et al. 2025.
Limit the number of observations scheduled for a given
target to the maximum value provided by the PI. This
constraint may later be relaxed if Round 2 of scheduling
is invoked.
"""
logs.info("Constraint: Set desired maximum observations.")
for name in self.multi_visit_requests:
all_d = list(set(list(self.all_valid_ds_for_request.loc[name].d)))
lhs1 = gp.quicksum(self.Wrd[name, d] for d in all_d)
rhs1 = self.desired_max_obs_allowed_dict[name]
self.model.addConstr(lhs1 <= rhs1, 'max_desired_unique_nights_for_request_' + str(name))
for name in self.single_visit_requests:
available = list(zip(list(self.all_valid_ds_for_request.loc[name].d), list(self.all_valid_ds_for_request.loc[name].s)))
lhs2 = gp.quicksum(self.Yrds[name, d, s] for d, s in available)
rhs2 = self.desired_max_obs_allowed_dict[name]
self.model.addConstr(lhs2 <= rhs2, 'max_desired_unique_nights_for_request_' + str(name))
[docs]
def remove_constraint_set_max_desired_unique_nights_Wrd(self):
"""
Bonus round constraint: not featured in Lubin et al. 2025.
Remove the maximum number of observations set by
constraints_set_max_desired_unique_nights_Wrd.
"""
logs.info("Constraint: Removing previous maximum observations constraint.")
for name in self.multi_visit_requests:
rm_const = self.model.getConstrByName("max_desired_unique_nights_for_request_" + str(name))
self.model.remove(rm_const)
[docs]
def constraint_set_max_absolute_unique_nights_Wrd(self):
"""
Bonus round constraint: not featured in Lubin et al. 2025.
Set the maximum number of observations for a target to
150% of the original requested number.
"""
logs.info("Constraint: Set absolute maximum observations.")
for name in self.multi_visit_requests:
all_d = list(set(list(self.all_valid_ds_for_request.loc[name].d)))
lhs = gp.quicksum(self.Wrd[name, d] for d in all_d)
rhs = self.absolute_max_obs_allowed_dict[name]
self.model.addConstr(lhs <= rhs, 'max_absolute_unique_nights_for_request_' + str(name))
[docs]
def constraint_build_enforce_intranight_cadence(self):
"""
Constraint 4 in Lubin et al. 2025.
Ensure that the minimum number of hours pass between
consecutive observations of a given target on the same
night. If a target is scheduled for observation at
a given time, prevent it from being scheduled again
until the minimum number of hours have passed.
"""
logs.info("Constraint: Enforce intra-night cadence.")
# get all combos of slots that must be constrained if given slot is scheduled
# # When intra-night cadence is 0, there will be no keys to constrain so skip
intracadence_valid_tuples = self.joiner.copy()[self.joiner['n_intra_max'] > 1]
intracadence_frame = pd.merge(
intracadence_valid_tuples.drop_duplicates(['unique_id','d','s']),
intracadence_valid_tuples[['unique_id','d','s']],
suffixes=['','3'],on=['unique_id', 'd']
).query('s + 0 < s3 < s + tau_intra')
intracadence_frame = intracadence_frame.groupby(['unique_id','d','s'])[['s3']].agg(list)
for i, row in intracadence_valid_tuples.iterrows():
if (row.unique_id, row.d, row.s) in intracadence_frame.index:
# Get all slots tonight which are too soon after given slot for another visit
slots_to_constrain_tonight_intra = list(intracadence_frame.loc[(row.unique_id, row.d, row.s)][0])
lhs = self.Yrds[row.unique_id,row.d,row.s]
rhs = self.Wrd[row.unique_id, row.d] - (gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in slots_to_constrain_tonight_intra))
self.model.addConstr(lhs <= rhs, 'enforce_intranight_cadence_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs]
def constraint_set_min_max_visits_per_night(self):
"""
See Constraint 5 in Lubin et al. 2025.
Require that the number of scheduled visits to a target
in a given night falls between the minimum and maximum
values supplied by the PI.
"""
logs.info("Constraint: Bound minimum and maximum visits per night.")
intracadence_frame_on_day = self.joiner.copy().drop_duplicates(subset=['unique_id', 'd'])
grouped_s = self.joiner.copy().groupby(['unique_id', 'd'])['s'].unique().reset_index()
grouped_s.set_index(['unique_id', 'd'], inplace=True)
for i, row in intracadence_frame_on_day.iterrows():
all_valid_slots_tonight = list(grouped_s.loc[(row.unique_id, row.d)]['s'])
if row.unique_id in self.multi_visit_requests:
lhs1 = gp.quicksum(self.Yrds[row.unique_id, row.d,s3] for s3 in all_valid_slots_tonight)
rhs1 = row.n_intra_max*self.Wrd[row.unique_id, row.d]
self.model.addConstr(lhs1 <= rhs1, 'enforce_max_visits1_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
lhs2 = gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in all_valid_slots_tonight)
rhs2 = row.n_intra_min*self.Wrd[row.unique_id, row.d]
self.model.addConstr(lhs2 >= rhs2, 'enforce_min_visits_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
else:
lhs3 = gp.quicksum(self.Yrds[row.unique_id,row.d,s3] for s3 in all_valid_slots_tonight)
rhs3 = row.n_intra_max
self.model.addConstr(lhs3 <= rhs3, 'enforce_min_visits_' + row.unique_id + "_" + str(row.d) + "d_" + str(row.s) + "s")
[docs]
def optimize_model(self):
"""
Solve the Gurobi model.
Returns:
None
"""
logs.debug("Begin model solve.")
t1 = time.time()
self.model.params.TimeLimit = self.solve_time_limit
self.model.Params.OutputFlag = self.gurobi_output
# Allow stop at 5% gap to prevent from spending lots of time on marginally better solution
self.model.params.MIPGap = self.solve_max_gap
# More aggressive presolve gives better solution in shorter time
self.model.params.Presolve = 2
#self.model.params.Presolve = 0
self.model.update()
self.model.optimize()
if self.model.Status == GRB.INFEASIBLE:
logs.critical('Model remains infeasible. Searching for invalid constraints.')
search = self.model.computeIIS()
logs.critical("Printing bad constraints:")
for c in self.model.getConstrs():
if c.IISConstr:
logs.critical('%s' % c.ConstrName)
for c in self.model.getGenConstrs():
if c.IISGenConstr:
logs.critical('%s' % c.GenConstrName)
else:
logs.debug("Model Successfully Solved.")
logs.info("Time to finish solver: {:.3f}".format(time.time()-t1))
[docs]
def run_model(self):
"""
Construct and solve the Gurobi model.
Returns:
None
"""
self.round_info = 'Round1'
self.build_model_round1()
self.optimize_model()
self.serialize_results_csv()
if self.run_bonus_round:
self.round_info = 'Round2'
self.build_model_round2()
self.optimize_model()
self.serialize_results_csv()
logs.info("Scheduling complete, clear skies!")
[docs]
def build_model_round1(self):
"""
Implement the constraints and objective function for Round 1 as described in Lubin et al. 2025.
Returns:
None
"""
t1 = time.time()
self.constraint_reserve_multislot_exposures()
self.constraint_enforce_internight_cadence()
self.constraint_set_max_desired_unique_nights_Wrd()
self.constraint_build_enforce_intranight_cadence()
self.constraint_set_min_max_visits_per_night()
self.constraint_build_theta_multivisit()
self.constraint_throttle()
self.set_objective_minimize_theta_time_normalized()
logs.info(f"Time to build constraints: {np.round(time.time()-t1,3):.3f}")
[docs]
def build_model_round2(self):
"""
Implement the constraints and objective function for Round 2. Not described in Lubin et al. 2025.
Returns:
None
"""
t1 = time.time()
self.remove_constraint_set_max_desired_unique_nights_Wrd()
self.constraint_set_max_absolute_unique_nights_Wrd()
self.constraint_fix_previous_objective()
self.set_objective_maximize_slots_used()
logs.info(f"Time to build constraints: {np.round(time.time()-t1,3):.3f}")
[docs]
def serialize_results_csv(self):
"""
Serialize the results to a CSV file.
Returns:
None
"""
logs.debug("Building human readable schedule.")
serialized_schedule = io.serialize_schedule(self.Yrds, self)
self.serialized_schedule = serialized_schedule
today_idx = self.all_dates_dict[self.current_day]
selected = [k[0] for k, v in self.Yrds.items() if v.x > 0 and k[1] == today_idx]
selected = list(set(selected))
selected_df = self.requests_frame[self.requests_frame['unique_id'].isin(selected)].copy()
selected_df.to_csv(os.path.join(self.output_directory, 'request_selected.csv'), index=False)
self.to_hdf5()
[docs]
def to_hdf5(self, hdf5_path=None):
"""
Save the SemesterPlanner object to an HDF5 file.
Args:
hdf5_path (str, optional): Path to save the HDF5 file.
If None, saves to output_directory/semester_planner.h5
"""
if hdf5_path is None:
hdf5_path = os.path.join(self.output_directory, 'semester_planner.h5')
# Remove existing file if it exists
if os.path.exists(hdf5_path):
os.remove(hdf5_path)
# Define serialization mappings
# Format: (hdf5_key, attribute_name, data_type, conversion_func)
# data_type: 'scalar', 'string', 'array', 'dict_json', 'dataframe', 'structured_array', 'list'
# DataFrames (saved using pandas HDF5 support)
dataframe_attrs = [
('requests_frame', 'requests_frame', 'dataframe', None),
('requests_frame_all', 'requests_frame_all', 'dataframe', None),
('serialized_schedule', 'serialized_schedule', 'dataframe', None),
]
# Scalar/string attributes
scalar_attrs = [
('current_day', 'current_day', 'string', None),
('semester_start_date', 'semester_start_date', 'string', None),
('semester_length', 'semester_length', 'scalar', None),
('semester_letter', 'semester_letter', 'string', None),
('slot_size', 'slot_size', 'scalar', None),
('n_slots_in_night', 'n_slots_in_night', 'scalar', None),
('n_nights_in_semester', 'n_nights_in_semester', 'scalar', None),
('n_slots_in_semester', 'n_slots_in_semester', 'scalar', None),
('today_starting_slot', 'today_starting_slot', 'scalar', None),
('today_starting_night', 'today_starting_night', 'scalar', None),
('run_band3', 'run_band3', 'scalar', None),
('observatory', 'observatory', 'string', None),
('output_directory', 'output_directory', 'string', None),
('run_weather_loss', 'run_weather_loss', 'scalar', None),
('solve_time_limit', 'solve_time_limit', 'scalar', None),
('gurobi_output', 'gurobi_output', 'scalar', None),
('solve_max_gap', 'solve_max_gap', 'scalar', None),
('max_bonus', 'max_bonus', 'scalar', None),
('run_bonus_round', 'run_bonus_round', 'scalar', None),
('semester_directory', 'semester_directory', 'string', None),
('custom_file', 'custom_file', 'string', None),
('allocation_file', 'allocation_file', 'string', None),
('throttle_grace', 'throttle_grace', 'scalar', None),
('hours_per_night', 'hours_per_night', 'scalar', None),
]
# Dictionary attributes (saved as JSON)
dict_attrs = [
('all_dates_dict_json', 'all_dates_dict', 'dict_json', None),
('slots_needed_for_exposure_dict_json', 'slots_needed_for_exposure_dict', 'dict_json', None),
('past_nights_observed_dict_json', 'past_nights_observed_dict', 'dict_json', None),
('past_history_json', 'past_history', 'past_history_dict', None),
]
# Array/list attributes
array_attrs = [
('all_dates_array', 'all_dates_array', 'list', None),
('access_record', 'access_record', 'structured_array', None),
]
# Save DataFrames first
for hdf5_key, attr_name, data_type, _ in dataframe_attrs:
df = getattr(self, attr_name)
df.to_hdf(hdf5_path, key=hdf5_key, mode='a', format='table')
# Save other attributes using h5py
with h5py.File(hdf5_path, 'a') as f:
# Save scalar/string attributes
for hdf5_key, attr_name, data_type, _ in scalar_attrs:
value = getattr(self, attr_name)
f.attrs[hdf5_key] = value
# Save dictionary attributes
for hdf5_key, attr_name, data_type, _ in dict_attrs:
if data_type == 'past_history_dict':
# Special handling for past_history (StarHistory namedtuples)
past_history = getattr(self, attr_name)
past_history_serialized = {}
for star_id, star_hist in past_history.items():
past_history_serialized[star_id] = {
'name': star_hist.name,
'date_last_observed': star_hist.date_last_observed,
'total_n_exposures': star_hist.total_n_exposures,
'total_n_visits': star_hist.total_n_visits,
'total_n_unique_nights': star_hist.total_n_unique_nights,
'total_open_shutter_time': star_hist.total_open_shutter_time,
'n_obs_on_nights': star_hist.n_obs_on_nights,
'n_visits_on_nights': star_hist.n_visits_on_nights,
'exposure_start_times': getattr(star_hist, 'exposure_start_times', []),
}
f.attrs[hdf5_key] = json.dumps(past_history_serialized)
else:
# Regular dictionary
value = getattr(self, attr_name)
f.attrs[hdf5_key] = json.dumps(value)
# Save array/list attributes
for hdf5_key, attr_name, data_type, _ in array_attrs:
if data_type == 'list':
value = getattr(self, attr_name)
f.create_dataset(hdf5_key, data=np.array(value, dtype='S'))
elif data_type == 'structured_array':
access_record = getattr(self, attr_name)
# Save each field of the structured array
for field_name in access_record.dtype.names:
f.create_dataset(f'{hdf5_key}/{field_name}',
data=access_record[field_name],
compression='gzip')
logs.info(f"SemesterPlanner saved to HDF5: {hdf5_path}")
return hdf5_path
[docs]
@classmethod
def from_hdf5(cls, hdf5_path):
"""
Load a SemesterPlanner object from an HDF5 file.
Args:
hdf5_path (str): Path to the HDF5 file
Returns:
SemesterPlanner: Reconstructed SemesterPlanner object
"""
import json
from astroq.history import StarHistory
# Create a new instance without calling __init__
instance = cls.__new__(cls)
# Define deserialization mappings (inverse of to_hdf5)
# Format: (hdf5_key, attribute_name, data_type, conversion_func)
# DataFrames (loaded using pandas HDF5 support)
dataframe_attrs = [
('requests_frame', 'requests_frame', 'dataframe', None),
('requests_frame_all', 'requests_frame_all', 'dataframe', None),
('serialized_schedule', 'serialized_schedule', 'dataframe', None),
]
# Scalar/string attributes
scalar_attrs = [
('current_day', 'current_day', 'string', None),
('semester_start_date', 'semester_start_date', 'string', None),
('semester_length', 'semester_length', 'scalar', None),
('semester_letter', 'semester_letter', 'string', None),
('slot_size', 'slot_size', 'scalar', None),
('n_slots_in_night', 'n_slots_in_night', 'scalar', None),
('n_nights_in_semester', 'n_nights_in_semester', 'scalar', None),
('n_slots_in_semester', 'n_slots_in_semester', 'scalar', None),
('today_starting_slot', 'today_starting_slot', 'scalar', None),
('today_starting_night', 'today_starting_night', 'scalar', None),
('run_band3', 'run_band3', 'scalar', None),
('observatory', 'observatory', 'string', None),
('output_directory', 'output_directory', 'string', None),
('run_weather_loss', 'run_weather_loss', 'scalar', None),
('solve_time_limit', 'solve_time_limit', 'scalar', None),
('gurobi_output', 'gurobi_output', 'scalar', None),
('solve_max_gap', 'solve_max_gap', 'scalar', None),
('max_bonus', 'max_bonus', 'scalar', None),
('run_bonus_round', 'run_bonus_round', 'scalar', None),
('semester_directory', 'semester_directory', 'string', None),
('custom_file', 'custom_file', 'string', None),
('allocation_file', 'allocation_file', 'string', None),
]
# Dictionary attributes (loaded from JSON)
dict_attrs = [
('all_dates_dict_json', 'all_dates_dict', 'dict_json', None),
('slots_needed_for_exposure_dict_json', 'slots_needed_for_exposure_dict', 'dict_json', None),
('past_nights_observed_dict_json', 'past_nights_observed_dict', 'dict_json', None),
('past_history_json', 'past_history', 'past_history_dict', None),
]
# Array/list attributes
array_attrs = [
('all_dates_array', 'all_dates_array', 'list', None),
('access_record', 'access_record', 'structured_array', None),
]
# Load DataFrames first
for hdf5_key, attr_name, data_type, _ in dataframe_attrs:
setattr(instance, attr_name, pd.read_hdf(hdf5_path, key=hdf5_key))
# Load other attributes from HDF5
with h5py.File(hdf5_path, 'r') as f:
# Load scalar/string attributes
for hdf5_key, attr_name, data_type, _ in scalar_attrs:
setattr(instance, attr_name, f.attrs[hdf5_key])
# Optional: backwards compat for HDF5 files saved before these were stored
instance.throttle_grace = float(f.attrs.get('throttle_grace', 1.25))
instance.hours_per_night = float(f.attrs.get('hours_per_night', 12.0))
# Load dictionary attributes
for hdf5_key, attr_name, data_type, _ in dict_attrs:
data = json.loads(f.attrs[hdf5_key])
if data_type == 'past_history_dict':
# Special handling for past_history (reconstruct StarHistory namedtuples)
past_history = {}
for star_id, hist_data in data.items():
star_hist = StarHistory(
name=hist_data['name'],
date_last_observed=hist_data['date_last_observed'],
total_n_exposures=hist_data['total_n_exposures'],
total_n_visits=hist_data['total_n_visits'],
total_n_unique_nights=hist_data['total_n_unique_nights'],
total_open_shutter_time=hist_data['total_open_shutter_time'],
n_obs_on_nights=hist_data['n_obs_on_nights'],
n_visits_on_nights=hist_data['n_visits_on_nights'],
exposure_start_times=hist_data.get('exposure_start_times', [])
)
past_history[star_id] = star_hist
setattr(instance, attr_name, past_history)
else:
# Regular dictionary
setattr(instance, attr_name, data)
# Load array/list attributes
for hdf5_key, attr_name, data_type, _ in array_attrs:
if data_type == 'list':
data = f[hdf5_key][:]
setattr(instance, attr_name, [d.decode('utf-8') if isinstance(d, bytes) else d for d in data])
elif data_type == 'structured_array':
# Reconstruct structured array from saved fields
field_names = list(f[hdf5_key].keys())
# Load all field data first
field_data = {}
for field_name in field_names:
field_data[field_name] = f[f'{hdf5_key}/{field_name}'][:]
# Determine the number of records (first dimension of first field)
first_field = field_data[field_names[0]]
n_records = first_field.shape[0]
# Create dtype list with proper shapes for multidimensional fields
dtype_list = []
for field_name in field_names:
data = field_data[field_name]
if data.ndim == 1:
# 1D field: just use the dtype
dtype_list.append((field_name, data.dtype))
else:
# Multidimensional field: include shape (excluding first dimension)
dtype_list.append((field_name, data.dtype, data.shape[1:]))
# Create structured array and populate it
struct_array = np.zeros(n_records, dtype=dtype_list)
for field_name in field_names:
struct_array[field_name] = field_data[field_name]
# Convert to recarray so we can use dot notation
setattr(instance, attr_name, struct_array.view(np.recarray))
# Recreate access_obj using the loaded parameters
# Use KPFCC-specific Access class for Keck Observatory, otherwise use base Access class
if 'Keck' in instance.observatory or 'keck' in instance.observatory.lower():
from astroq.queue.kpfcc import Access_KPFCC
AccessClass = Access_KPFCC
else:
AccessClass = ac.Access
instance.access_obj = AccessClass(
semester_start_date=instance.semester_start_date,
semester_length=instance.semester_length,
n_nights_in_semester=instance.n_nights_in_semester,
today_starting_night=instance.today_starting_night,
current_day=instance.current_day,
all_dates_dict=instance.all_dates_dict,
all_dates_array=instance.all_dates_array,
slot_size=instance.slot_size,
slots_needed_for_exposure_dict=instance.slots_needed_for_exposure_dict,
custom_file=instance.custom_file,
allocation_file=instance.allocation_file,
past_history=instance.past_history,
output_directory=instance.output_directory,
run_weather_loss=instance.run_weather_loss,
run_band3=instance.run_band3,
observatory_string=instance.observatory,
request_frame=instance.requests_frame
)
logs.info(f"SemesterPlanner loaded from HDF5: {hdf5_path}")
return instance
[docs]
def add_twilights(self):
"""Add 20-minute buffer to allocation times that match 12-degree twilight."""
observatory = self.config.get('global', 'observatory')
keck = apl.Observer.at_site(observatory)
allocation_df = pd.read_csv(self.allocation_file)
for idx, row in allocation_df.iterrows():
# Get date from start time (first 10 chars = YYYY-MM-DD)
date_str = str(row['start'])[:10]
day = Time(date_str, format='iso', scale='utc')
# Get 12-degree twilight times
evening_12 = keck.twilight_evening_nautical(day, which='next')
morning_12 = keck.twilight_morning_nautical(day, which='next')
# Check if start time matches evening twilight (within 10 min)
start_time = Time(row['start'])
if abs(start_time - evening_12) <= TimeDelta(10, format='jd') / 1440: # 10 minutes
adjusted_start = start_time - TimeDelta(20, format='jd') / 1440
allocation_df.loc[idx, 'start'] = adjusted_start.strftime('%Y-%m-%dT%H:%M')
logs.info(f"Adjusted start time for {date_str}: subtracted 20 min")
# Check if stop time matches morning twilight (within 10 min)
stop_time = Time(row['stop'])
if abs(stop_time - morning_12) <= TimeDelta(10, format='jd') / 1440: # 10 minutes
adjusted_stop = stop_time + TimeDelta(20, format='jd') / 1440
allocation_df.loc[idx, 'stop'] = adjusted_stop.strftime('%Y-%m-%dT%H:%M')
logs.info(f"Adjusted stop time for {date_str}: added 20 min")
# Save updated allocation file
allocation_df.to_csv(self.allocation_file, index=False)
logs.info("Allocation file updated with twilight adjustments")