Source code for astroq.nplan

"""
Module for night-level observation planning and optimization.
Uses the TTP package to optimize nightly observation sequences.
See https://github.com/lukehandley/ttp/tree/main for more info about the TTP
"""

# Standard library imports
import os
import pickle
from configparser import ConfigParser

# Third-party imports
import numpy as np
import pandas as pd
from astropy.time import Time, TimeDelta
        
# Local imports
import astroq.access as ac
import astroq.io as io
from astroq.splan import SemesterPlanner
import astroq.queue.kpfcc as kpfcc

# TTP imports (assuming TTP is installed separately)
import ttp.formatting as formatting
import ttp.telescope as telescope
import ttp.plotting as plotting
import ttp.model as model

[docs] class NightPlanner(object): """ The NightPlanner object is responsible for preparing, running, and outputting the TTP slew path optimization. It is built from the config file and requires a semester_planner object to have been created and saved to an h5 file first. """ def __init__(self, config_file): """ Initialize the Night Planner with a config file. Args: config_file: Path to configuration file """ # Parse config file directly for paths (following SemesterPlanner pattern) from configparser import ConfigParser config = ConfigParser() config.read(config_file) # Get workdir from global section workdir = str(config.get('global', 'workdir')) self.upstream_path = workdir self.semester_directory = self.upstream_path self.current_day = str(config.get('global', 'current_day')) self.output_directory = os.path.join(self.upstream_path, "outputs") self.reports_directory = os.path.join(self.upstream_path, "outputs") # Get night plan specific parameters self.max_solve_gap = config.getfloat('night', 'max_solve_gap') self.max_solve_time = config.getint('night', 'max_solve_time') self.show_gurobi_output = config.getboolean('night', 'show_gurobi_output') # Set up allocation file path from data section allocation_file_config = str(config.get('data', 'allocation_file')) if os.path.isabs(allocation_file_config): self.allocation_file = allocation_file_config else: self.allocation_file = os.path.join(self.semester_directory, allocation_file_config) # Set up backup file path filler_file_config = str(config.get('data', 'filler_file')) if os.path.isabs(filler_file_config): self.filler_file = filler_file_config else: self.filler_file = os.path.join(self.semester_directory, filler_file_config) # Set up custom file path from data section custom_file_config = str(config.get('data', 'custom_file')) if os.path.isabs(custom_file_config): self.custom_file = custom_file_config else: self.custom_file = os.path.join(self.semester_directory, custom_file_config) # Load SemesterPlanner from pickle file instead of creating new one config = ConfigParser() config.read(config_file) workdir = os.path.join(str(config.get('global', 'workdir')), "outputs") semester_planner_h5 = os.path.join(workdir, 'semester_planner.h5') self.semester_planner = SemesterPlanner.from_hdf5(semester_planner_h5) # Pull properties from SemesterPlanner for consistency self.semester_start_date = self.semester_planner.semester_start_date self.semester_length = self.semester_planner.semester_length self.all_dates_dict = self.semester_planner.all_dates_dict self.all_dates_array = self.semester_planner.all_dates_array self.today_starting_night = self.semester_planner.today_starting_night self.past_history = self.semester_planner.past_history self.slots_needed_for_exposure_dict = self.semester_planner.slots_needed_for_exposure_dict self.run_weather_loss = self.semester_planner.run_weather_loss # Compute tonight's allocation gaps: runs of unallocated slots (zeros) between allocated slots (ones) access_record = self.semester_planner.access_record tonight_index = self.today_starting_night slot_size = self.semester_planner.slot_size # minutes per slot # is_alloc shape (ntargets, nnights, nslots); allocation is same for all targets tonight_allocated = access_record.is_alloc[0, tonight_index, :] # 1D: 1=allocated, 0=not allocated = tonight_allocated.astype(np.int8) diff = np.diff(allocated) # Gap = run of zeros between ones (exclude leading/trailing zeros). diff==-1: 1->0 (gap start); diff==1: 0->1 (gap end) gap_start_slots = np.where(diff == -1)[0] + 1 # first zero slot of each potential gap gap_end_slots = np.where(diff == 1)[0] # last zero slot before each 0->1 transition total_slots_in_night = len(allocated) total_allocated_slots = int(np.sum(allocated)) total_nonallocated_slots = int(np.sum(1 - allocated)) self.tonight_allocation_gaps = [] for start_slot in gap_start_slots: # Pair with next gap_end that is >= start_slot (excludes trailing zeros) candidates = gap_end_slots[gap_end_slots >= start_slot] if len(candidates) > 0: end_slot = candidates[0] n_slots_in_gap = end_slot - start_slot + 1 start_minutes = start_slot * slot_size end_minutes = (end_slot + 1) * slot_size # end of last zero slot gap_length = n_slots_in_gap * slot_size gap_start_time = f"{int(start_minutes // 60):02d}:{int(start_minutes % 60):02d}" gap_stop_time = f"{int(end_minutes // 60):02d}:{int(end_minutes % 60):02d}" self.tonight_allocation_gaps.append({ 'total_slots_in_night': total_slots_in_night, 'total_allocated_slots': total_allocated_slots, 'total_nonallocated_slots': total_nonallocated_slots, 'n_slots_in_gap': n_slots_in_gap, 'gap_start_slot': start_slot, 'gap_start_time': gap_start_time, 'gap_stop_slot': end_slot, 'gap_stop_time': gap_stop_time, 'gap_length': gap_length, }) self.tonight_total_unallocated_slots = int(np.sum(1 - allocated)) self.tonight_total_unallocated_minutes = self.tonight_total_unallocated_slots * slot_size self.tonight_gap_unallocated_slots = sum(g['n_slots_in_gap'] for g in self.tonight_allocation_gaps) self.tonight_gap_unallocated_minutes = self.tonight_gap_unallocated_slots * slot_size
[docs] def run_ttp(self): """ Prepare the TTP input dataframe by parsing the request_selected.csv file. Ensure data is in the correct format for TTP. Then run the TTP optimization to produce the solution which is then saved out as an hdf5 file. If no targets are selected, the function will gracefully return without running the TTP. Args: None Returns: None """ observers_path = os.path.join(self.semester_directory, 'outputs/') check1 = os.path.isdir(observers_path) if not check1: os.makedirs(observers_path) observatory = telescope.Keck1() # Get start/stop times from allocation file try: observation_start_time, observation_stop_time = get_nightly_times_from_allocation(self.allocation_file, self.current_day) total_time = np.round((observation_stop_time.jd-observation_start_time.jd)*24,3) print("Time in Night for Observations: " + str(total_time) + " hours.") except ValueError as e: print(f"No allocation times found for date {self.current_day}. Not running TTP. No night_planner.h5 file will be created.") return False # Use only request_selected.csv as the source of scheduled targets selected_path = os.path.join(self.output_directory, 'request_selected.csv') if not os.path.exists(selected_path): raise FileNotFoundError(f"{selected_path} not found. Please run the scheduler first.") selected_df = pd.read_csv(selected_path) # Gracefully fail if no targets are selected (useful on non-"full" bands when not allocated) if len(selected_df) == 0: print(f"No targets found in {selected_path}. Not running TTP. No night_planner.pkl file will be created.") return # Add the first and last available columns to the selected_df for use by the TTP first_available, last_available = self.get_first_last_indices(selected_df) selected_df['first_available'] = first_available selected_df['last_available'] = last_available # Fill NaN values with defaults --- for now in early 2025B since we had issues with the webform.c # Replace "None" strings with NaN first, then fill with defaults selected_df['n_intra_max'] = selected_df['n_intra_max'].replace('None', np.nan).fillna(1) selected_df['n_intra_min'] = selected_df['n_intra_min'].replace('None', np.nan).fillna(1) selected_df['tau_intra'] = selected_df['tau_intra'].replace('None', np.nan).fillna(0.0) selected_df['jmag'] = selected_df['jmag'].replace('None', np.nan).fillna(0.0) selected_df['gmag'] = selected_df['gmag'].replace('None', np.nan).fillna(0.0) selected_df['pmra'] = selected_df['pmra'].replace('None', np.nan).fillna(0.0) selected_df['pmdec'] = selected_df['pmdec'].replace('None', np.nan).fillna(0.0) selected_df['epoch'] = selected_df['epoch'].replace('None', np.nan).fillna(0.0) # Prepare the TTP input DataFrame (matching the old prepare_for_ttp output) to_ttp = pd.DataFrame({ "Starname": selected_df["unique_id"], "RA": selected_df["ra"], "Dec": selected_df["dec"], "Exposure Time": selected_df["exptime"], "Exposures Per Visit": selected_df["n_exp"], "Visits In Night": selected_df["n_intra_max"], "Intra_Night_Cadence": selected_df["tau_intra"], "Priority": 10, # Default priority, or you can add logic if needed "First Available": selected_df["first_available"], "Last Available": selected_df["last_available"], }) # Add dummy gap observations when there are unallocated gaps between allocated slots if len(self.tonight_allocation_gaps) > 0: avg_ra = selected_df["ra"].mean() avg_dec = selected_df["dec"].mean() tonight_date = self.current_day gap_rows = [] for i, gap in enumerate(self.tonight_allocation_gaps, start=1): first_available = f"{tonight_date} {gap['gap_start_time']}" last_available = f"{tonight_date} {gap['gap_stop_time']}" # Exposure Time in TTP is seconds; gap_length is minutes exposure_time_sec = (gap['gap_length'] - self.semester_planner.slot_size) * 60 gap_rows.append({ "Starname": f"Gap {i}", "RA": avg_ra, "Dec": avg_dec, "Exposure Time": exposure_time_sec, "Exposures Per Visit": 1, "Visits In Night": 1, "Intra_Night_Cadence": 0, "Priority": 20, #always very high priority to ensure it is scheduled "First Available": first_available, "Last Available": last_available, }) to_ttp = pd.concat([to_ttp, pd.DataFrame(gap_rows)], ignore_index=True) filename = os.path.join(self.output_directory, 'ttp_prepared.csv') to_ttp.to_csv(filename, index=False) target_list = formatting.theTTP(filename, observatory, observation_start_time, observation_stop_time) solution = model.TTPModel(target_list, observers_path, runtime=self.max_solve_time, optgap=self.max_solve_gap) gurobi_model_backup = solution.gurobi_model # backup the attribute, probably don't need this del solution.gurobi_model # remove attribute so object is hdf5 compatable # Compute gap stats BEFORE scrubbing (for adjusted TTP statistics) gap_exposure_min = 0.0 gap_count = 0 if len(self.tonight_allocation_gaps) > 0: plotly_exp = solution.plotly.get('Total Exp Time (min)', []) for i, name in enumerate(solution.plotly.get('Starname', [])): if str(name).startswith('Gap '): gap_count += 1 gap_exposure_min += float(plotly_exp[i]) if i < len(plotly_exp) else 0 if solution.extras and solution.extras.get('Starname'): extras_exp = solution.extras.get('Total Exp Time (min)', []) for j, name in enumerate(solution.extras['Starname']): if str(name).startswith('Gap '): gap_count += 1 gap_exposure_min += float(extras_exp[j]) if j < len(extras_exp) else 0 gap_total_min = self.tonight_gap_unallocated_minutes if len(self.tonight_allocation_gaps) > 0 else 0.0 n_gap_targets = len(self.tonight_allocation_gaps) # Remove dummy "Gap X" rows from the solution so they never appear in outputs def drop_gap_rows(d): keep = [i for i, s in enumerate(d['Starname']) if not str(s).startswith('Gap ')] return {k: [v[i] for i in keep] for k, v in d.items()} solution.plotly = drop_gap_rows(solution.plotly) if solution.extras is not None: if isinstance(solution.extras, pd.DataFrame): solution.extras = solution.extras[ ~solution.extras['Starname'].astype(str).str.startswith('Gap ') ] elif len(solution.extras.get('Starname', [])) > 0: solution.extras = drop_gap_rows(solution.extras) # Scrub Gap from other solution attributes so nothing references them if getattr(solution, 'stars', None) is not None: solution.stars = [s for s in solution.stars if not str(getattr(s, 'name', '')).startswith('Gap ')] if getattr(solution, 'schedule', None) is not None and isinstance(solution.schedule, dict) and 'Starname' in solution.schedule: solution.schedule = drop_gap_rows(solution.schedule) # Update TTP stats to exclude Gap observations (observing duration, exposing, idle) if gap_total_min > 0 or gap_exposure_min > 0: solution.dur = max(0, solution.dur - gap_total_min) solution.time_exposing = max(0, solution.time_exposing - gap_exposure_min) solution.time_idle = max(0, solution.dur - solution.time_exposing - solution.time_slewing) solution.num_scheduled = solution.num_scheduled - gap_count # Re-print and overwrite TTPstatistics.txt with gap-adjusted stats ttp_stats_path = os.path.join(observers_path, 'TTPstatistics.txt') with open(ttp_stats_path, 'w') as f: f.write("Stats for TTP Solution (Gap observations excluded)\n") f.write("------------------------------------\n") f.write(f' Model ran for {solution.solve_time:.2f} seconds\n') f.write(f' Observations Requested: {solution.N - 2 - n_gap_targets}\n') f.write(f' Observations Scheduled: {solution.num_scheduled}\n') f.write("------------------------------------\n") f.write(f' Observing Duration (min): {solution.dur:.2f}\n') f.write(f' Time Spent Exposing (min): {solution.time_exposing:.2f}\n') f.write(f' Time Spent Idle (min): {solution.time_idle:.2f}\n') f.write(f' Time Spent Slewing (min): {solution.time_slewing:.2f}\n') f.write("------------------------------------\n") print('\n------------------------------------') print(' (Gap observations excluded from stats)') print('------------------------------------') print(f' Observations Requested: {solution.N - 2 - n_gap_targets}') print(f' Observations Scheduled: {solution.num_scheduled}') print('------------------------------------') print(f' Observing Duration (min): {solution.dur:.2f}') print(f' Time Spent Exposing (min): {solution.time_exposing:.2f}') print(f' Time Spent Idle (min): {solution.time_idle:.2f}') print(f' Time Spent Slewing (min): {solution.time_slewing:.2f}') print('------------------------------------') # add human readable starname to the solution so that it can be used in the plotting functions id_to_name = dict(zip(selected_df['unique_id'], selected_df['starname'])) solution.plotly['human_starname'] = [ id_to_name.get(uid, "NO MATCHING NAME") for uid in solution.plotly['Starname'] ] self.solution = [solution] solution.plotly['UTC Start Time'] = [0]*len(solution.plotly['Start Exposure']) for i in range(len(solution.plotly['Start Exposure'])): solution.plotly['UTC Start Time'][i] = str(TimeDelta(solution.plotly['Start Exposure'][i]*60,format='sec') + observation_start_time)[11:16] numeric_columns = ['Start Exposure', 'First Available', 'Last Available', 'Minutes the from Start of the Night'] for col in numeric_columns: if col in solution.plotly: solution.plotly[col] = np.round(np.array(solution.plotly[col]), 2).tolist() # Convert solution.plotly to a DataFrame for easier handling observe_order_file = os.path.join(observers_path, f"ObserveOrder_{self.current_day}.txt") plotly_df = pd.DataFrame(solution.plotly) use_starnames = [] use_star_ids = [] use_start_exposures = [] for i in range(len(plotly_df)): adjusted_timestamp = TimeDelta(plotly_df['Start Exposure'].iloc[i]*60,format='sec') + observation_start_time use_start_exposures.append(str(adjusted_timestamp)[11:16]) use_starnames.append(selected_df[selected_df['unique_id'] == plotly_df['Starname'].iloc[i]]['starname'].iloc[0]) use_star_ids.append(str(plotly_df['Starname'].iloc[i])) extras_df = pd.DataFrame(solution.extras) for j in range(len(extras_df)): use_start_exposures.append('24:00') use_star_ids.append(str(extras_df['Starname'].iloc[j])) use_starnames.append(selected_df[selected_df['unique_id'] == extras_df['Starname'].iloc[j]]['starname'].iloc[0]) use_frame = pd.DataFrame({'unique_id': use_star_ids, 'Target': use_starnames, 'StartExposure': use_start_exposures}) use_frame.to_csv(observe_order_file, index=False) kpfcc.write_starlist(selected_df, solution.plotly, observation_start_time, solution.extras, [], str(self.current_day), observers_path) print("The optimal path through the sky for the selected stars is found. Clear skies!") return True
[docs] def get_first_last_indices(self, selected_df): """ Get the first and last available time slots for each target in selected_df. Args: selected_df (pd.DataFrame): DataFrame containing selected targets with unique_id column Returns: first_available_list (list) - Lists of time strings in HH:MM format for each target's first available slot last_available_list (list) - Lists of time strings in HH:MM format for each target's last available slot """ # Get tonight's index from the all_dates_dict tonight_index = self.all_dates_dict[self.current_day] # Get the access record from semester planner access_record = self.semester_planner.access_record # Create mapping from unique_id to target index in the access record # The access record was created from the original requests_frame, so we need to map # selected_df targets back to their indices in the original requests_frame target_to_index = {} for idx, row in self.semester_planner.requests_frame.iterrows(): target_to_index[row['unique_id']] = idx # Initialize the new columns first_available = [] last_available = [] # For each target in selected_df, find first and last available slots tonight for _, row in selected_df.iterrows(): target_id = row['unique_id'] # Get the target's index in the access record if target_id in target_to_index: target_idx = target_to_index[target_id] # Get tonight's observability array for this target (shape: nslots) tonight_observable = access_record.is_observable[target_idx, tonight_index, :] # Find first and last True indices true_indices = np.where(tonight_observable)[0] if len(true_indices) > 0: first_slot = true_indices[0] last_slot = true_indices[-1] # Convert slot indices to time strings (assuming slot_size is in minutes) first_time_minutes = first_slot * self.semester_planner.slot_size last_time_minutes = last_slot * self.semester_planner.slot_size first_hour = first_time_minutes // 60 first_minute = first_time_minutes % 60 last_hour = last_time_minutes // 60 last_minute = last_time_minutes % 60 first_available.append(f"{self.current_day} {first_hour:02d}:{first_minute:02d}") last_available.append(f"{self.current_day} {last_hour:02d}:{last_minute:02d}") else: # No available slots tonight, use dummy values so TTP doesn't break last_hour = 23 last_minute = 59 first_available.append(f"{self.current_day} {last_hour:02d}:{last_minute:02d}") last_available.append(f"{self.current_day} {last_hour:02d}:{last_minute:02d}") else: # Target not found in original requests_frame last_hour = 23 last_minute = 59 first_available.append(f"{self.current_day} {last_hour:02d}:{last_minute:02d}") last_available.append(f"{self.current_day} {last_hour:02d}:{last_minute:02d}") return first_available, last_available
[docs] def to_hdf5(self, hdf5_path=None): """ Save the NightPlanner object to an HDF5 file. Args: hdf5_path (str, optional): Path to save the HDF5 file. If None, saves to output_directory/night_planner.h5 """ import h5py import json if hdf5_path is None: hdf5_path = os.path.join(self.output_directory, 'night_planner.h5') # Remove existing file if it exists if os.path.exists(hdf5_path): os.remove(hdf5_path) # Define serialization mappings # Format: (hdf5_key, object_path, data_type, conversion_func) # data_type: 'scalar', 'string', 'array', 'time', 'dict_json', 'dataframe', 'stars' # object_path: attribute path like 'solution.plotly' or 'self.upstream_path' # NightPlanner scalar/string attributes nightplanner_attrs = [ ('upstream_path', 'self.upstream_path', 'string', None), ('semester_directory', 'self.semester_directory', 'string', None), ('current_day', 'self.current_day', 'string', None), ('output_directory', 'self.output_directory', 'string', None), ('reports_directory', 'self.reports_directory', 'string', None), ('max_solve_gap', 'self.max_solve_gap', 'scalar', None), ('max_solve_time', 'self.max_solve_time', 'scalar', None), ('show_gurobi_output', 'self.show_gurobi_output', 'scalar', None), ('allocation_file', 'self.allocation_file', 'string', None), ('filler_file', 'self.filler_file', 'string', None), ('custom_file', 'self.custom_file', 'string', None), ] # Solution object attributes solution = self.solution[0] solution_attrs = [ ('solution_plotly_json', 'solution.plotly', 'dict_json', None), ('solution_times_jd', 'solution.times', 'time_list', None), ('nightstarts_jd', 'solution.nightstarts', 'time', None), ('nightends_jd', 'solution.nightends', 'time', None), ('solution_schedule_json', 'solution.schedule', 'dict_json', None), ('solution_star_names', 'solution.stars', 'stars', 'name'), ('solution_star_ras', 'solution.stars', 'stars', 'ra'), ('solution_star_decs', 'solution.stars', 'stars', 'dec'), ('solution_az_path', 'solution.az_path', 'array', None), ('solution_alt_path', 'solution.alt_path', 'array', None), ] # Save solution.extras first (special case - DataFrame or dict) extras_is_dict = isinstance(solution.extras, dict) if isinstance(solution.extras, pd.DataFrame): # Save DataFrame (even if empty) extras_df = solution.extras elif isinstance(solution.extras, dict): # Convert dict to DataFrame (handles empty dicts with empty lists) # pd.DataFrame() creates empty DataFrame with columns when all lists are empty extras_df = pd.DataFrame(solution.extras) # Always save, even if DataFrame is empty (0 rows) # Use 'fixed' format for empty DataFrames, 'table' for non-empty if extras_df.empty: extras_df.to_hdf(hdf5_path, key='solution_extras', mode='a', format='fixed') else: extras_df.to_hdf(hdf5_path, key='solution_extras', mode='a', format='table') # Save all attributes with h5py.File(hdf5_path, 'a') as f: # Save extras type flag f.attrs['extras_was_dict'] = extras_is_dict # Save solution attributes for hdf5_key, obj_path, data_type, extra in solution_attrs: obj = solution for attr in obj_path.split('.')[1:]: # Skip 'solution' part obj = getattr(obj, attr) if data_type == 'dict_json': # Convert dict with arrays/lists to JSON-serializable format (native Python types) def _to_native(x): if isinstance(x, np.ndarray): return _to_native(x.tolist()) if isinstance(x, (list, tuple)): return [_to_native(v) for v in x] if isinstance(x, dict): return {k: _to_native(v) for k, v in x.items()} if isinstance(x, (np.integer, np.int64, np.int32)): return int(x) if isinstance(x, (np.floating, np.float64, np.float32)): return float(x) if isinstance(x, (np.bool_, bool)): return bool(x) return x serializable = {k: _to_native(v) for k, v in obj.items()} f.attrs[hdf5_key] = json.dumps(serializable) elif data_type == 'time_list': # Convert list of Time objects to array of JD times_jd = np.array([t.jd for t in obj]) f.create_dataset(hdf5_key, data=times_jd) elif data_type == 'time': # Convert Time object to JD scalar f.attrs[hdf5_key] = obj.jd elif data_type == 'array': # Save as numpy array dataset f.create_dataset(hdf5_key, data=np.array(obj)) elif data_type == 'stars': # Extract star data (name, ra, or dec) if extra == 'name': star_data = [s.name for s in obj] f.create_dataset(hdf5_key, data=np.array(star_data, dtype='S')) elif extra == 'ra': star_data = [s.target.ra.deg for s in obj] f.create_dataset(hdf5_key, data=np.array(star_data)) elif extra == 'dec': star_data = [s.target.dec.deg for s in obj] f.create_dataset(hdf5_key, data=np.array(star_data)) # Save NightPlanner attributes for hdf5_key, obj_path, data_type, _ in nightplanner_attrs: attr_name = obj_path.split('.')[-1] value = getattr(self, attr_name) f.attrs[hdf5_key] = value # Save path to semester_planner.h5 file semester_planner_h5_path = os.path.join(self.output_directory, 'semester_planner.h5') f.attrs['semester_planner_h5_path'] = semester_planner_h5_path return hdf5_path
[docs] @classmethod def from_hdf5(cls, hdf5_path): """ Load a NightPlanner object from an HDF5 file. Args: hdf5_path (str): Path to the HDF5 file Returns: NightPlanner: Reconstructed NightPlanner object """ import h5py import json import tables from astropy.coordinates import SkyCoord import astropy.units as u # Create a new instance without calling __init__ instance = cls.__new__(cls) # Define deserialization mappings (inverse of to_hdf5) # Format: (hdf5_key, attribute_name, data_type, conversion_func) nightplanner_attrs = [ ('upstream_path', 'upstream_path', 'string', None), ('semester_directory', 'semester_directory', 'string', None), ('current_day', 'current_day', 'string', None), ('output_directory', 'output_directory', 'string', None), ('reports_directory', 'reports_directory', 'string', None), ('max_solve_gap', 'max_solve_gap', 'scalar', None), ('max_solve_time', 'max_solve_time', 'scalar', None), ('show_gurobi_output', 'show_gurobi_output', 'scalar', None), ('allocation_file', 'allocation_file', 'string', None), ('filler_file', 'filler_file', 'string', None), ('custom_file', 'custom_file', 'string', None), ] solution_attrs = [ ('solution_plotly_json', 'plotly', 'dict_json', None), ('solution_times_jd', 'times', 'time_list', None), ('nightstarts_jd', 'nightstarts', 'time', None), ('nightends_jd', 'nightends', 'time', None), ('solution_schedule_json', 'schedule', 'dict_json', None), ('solution_az_path', 'az_path', 'array', None), ('solution_alt_path', 'alt_path', 'array', None), ] # Load solution.extras (special case - DataFrame or dict) # Check that it exists first - if not, that's a problem with h5py.File(hdf5_path, 'r') as f: if 'solution_extras' not in f: raise AttributeError("solution.extras not found in HDF5 file") solution_extras_df = pd.read_hdf(hdf5_path, key='solution_extras') # Reconstruct solution object class SolutionContainer: pass solution = SolutionContainer() with h5py.File(hdf5_path, 'r') as f: # Load NightPlanner attributes for hdf5_key, attr_name, data_type, _ in nightplanner_attrs: setattr(instance, attr_name, f.attrs[hdf5_key]) # Load semester_planner semester_planner_h5_path = f.attrs['semester_planner_h5_path'] if not os.path.exists(semester_planner_h5_path): raise FileNotFoundError(f"semester_planner.h5 not found at {semester_planner_h5_path}") instance.semester_planner = SemesterPlanner.from_hdf5(semester_planner_h5_path) # Pull properties from SemesterPlanner instance.semester_start_date = instance.semester_planner.semester_start_date instance.semester_length = instance.semester_planner.semester_length instance.all_dates_dict = instance.semester_planner.all_dates_dict instance.all_dates_array = instance.semester_planner.all_dates_array instance.today_starting_night = instance.semester_planner.today_starting_night instance.past_history = instance.semester_planner.past_history instance.slots_needed_for_exposure_dict = instance.semester_planner.slots_needed_for_exposure_dict instance.run_weather_loss = instance.semester_planner.run_weather_loss # Load solution attributes for hdf5_key, attr_name, data_type, extra in solution_attrs: if data_type == 'dict_json': data = json.loads(f.attrs[hdf5_key]) # Convert lists back to numpy arrays restored = {} for key, value in data.items(): if isinstance(value, list): restored[key] = np.array(value) else: restored[key] = value setattr(solution, attr_name, restored) elif data_type == 'time_list': times_jd = f[hdf5_key][:] setattr(solution, attr_name, [Time(jd, format='jd') for jd in times_jd]) elif data_type == 'time': jd = f.attrs[hdf5_key] setattr(solution, attr_name, Time(jd, format='jd')) elif data_type == 'array': data = f[hdf5_key][:] setattr(solution, attr_name, data) # Load solution.stars (reconstruct star objects with targets) star_names = [name.decode('utf-8') if isinstance(name, bytes) else name for name in f['solution_star_names'][:]] star_ras = f['solution_star_ras'][:] star_decs = f['solution_star_decs'][:] solution.stars = [] for name, ra, dec in zip(star_names, star_ras, star_decs): star = SolutionContainer() star.name = name star.target = SkyCoord(ra=ra*u.deg, dec=dec*u.deg) solution.stars.append(star) # Load observatory (recreate Keck1 object) import sys sys.path.append('/Users/jack/Documents/github/ttp/ttp/') import telescope solution.observatory = telescope.Keck1() # Load solution.extras (convert back to dict if needed) with h5py.File(hdf5_path, 'r') as f: extras_was_dict = f.attrs['extras_was_dict'] if extras_was_dict: solution.extras = solution_extras_df.to_dict('list') else: solution.extras = solution_extras_df # Scrub any "Gap X" entries from loaded data (handles HDF5 saved before run-time scrubbing) def _drop_gap_from_dict(d): if not isinstance(d, dict) or 'Starname' not in d: return d keep = [i for i, s in enumerate(d['Starname']) if not str(s).startswith('Gap ')] return {k: ([v[i] for i in keep] if isinstance(v, (list, np.ndarray)) else v) for k, v in d.items()} solution.plotly = _drop_gap_from_dict(solution.plotly) solution.schedule = _drop_gap_from_dict(solution.schedule) solution.stars = [s for s in solution.stars if not str(getattr(s, 'name', '')).startswith('Gap ')] if solution.extras is not None: if isinstance(solution.extras, pd.DataFrame): solution.extras = solution.extras[ ~solution.extras['Starname'].astype(str).str.startswith('Gap ') ] elif isinstance(solution.extras, dict) and solution.extras.get('Starname'): solution.extras = _drop_gap_from_dict(solution.extras) instance.solution = [solution] return instance
[docs] def get_nightly_times_from_allocation(allocation_file, current_day): """ Extract start and stop times for a specific date from allocation.csv. Args: allocation_file (str): path to the allocation file current_day (str): the date to look for in YYYY-MM-DD format Returns: start_time (Time object): the start time of the allocation for the current day stop_time (Time object): the stop time of the allocation for the current day """ allocated_times_frame = pd.read_csv(allocation_file) allocated_times_frame['start'] = allocated_times_frame['start'].apply(Time) allocated_times_frame['stop'] = allocated_times_frame['stop'].apply(Time) # Filter for the current day current_day_str = str(current_day) day_allocations = [] for _, row in allocated_times_frame.iterrows(): start_datetime = str(row['start'])[:10] # Extract date part (YYYY-MM-DD) if start_datetime == current_day_str: day_allocations.append(row) if not day_allocations: raise ValueError(f"No allocation found for date {current_day_str}") # For multiple allocations on the same day, use the earliest start and latest stop start_times = [row['start'] for row in day_allocations] stop_times = [row['stop'] for row in day_allocations] earliest_start = min(start_times) latest_stop = max(stop_times) return earliest_start, latest_stop