Source code for astroq.access

"""
Module for computing the intersection of the various accessibility maps for all targets for the following constraints:
    - telescope pointing
    - telescope allocation
    - sky brightness
    - moon separation
    - internight cadence from past history
    - PI custom windows
    - simulated weather loss
    - enough time to complete the exposure tonight

The Access class is saved as an attribute of the splan object and used again in plotting.
"""

# Standard library imports
import logging

# Third-party imports
from astropy.utils.iers import conf
conf.auto_max_age = None
import astropy as apy
import astropy.units as u
import astroplan as apl
import numpy as np
import pandas as pd
from astropy.time import Time, TimeDelta
import os

DATADIR = os.path.join(os.path.dirname(os.path.dirname(__file__)),'data')

logs = logging.getLogger(__name__)

# Keck limits -- lets talk about making this a subclass
nays_az_low = 5.3
nays_az_high = 146.2
nays_alt = 33.3
tel_min = 18
tel_max = 85

[docs] class Access: """ The Access class encapsulates all the parameters needed for accessibility computation and provides an object-oriented interface to the accessibility computation. """ def __init__(self, semester_start_date, semester_length, n_nights_in_semester, today_starting_night, current_day, all_dates_dict, all_dates_array, slot_size, slots_needed_for_exposure_dict, custom_file, allocation_file, past_history, output_directory, run_weather_loss, run_band3, observatory_string, request_frame, ): """ Initialize the Access object with explicit parameters. Args: semester_start_date: Start date of the semester semester_length: Total number of nights in the semester n_nights_in_semester: Number of remaining nights in the semester today_starting_night: Starting night number for today current_day: Current day identifier all_dates_dict: Dictionary mapping dates to day numbers all_dates_array: Array of date strings for the semester slot_size: Size of each time slot in minutes slots_needed_for_exposure_dict: Dictionary mapping star names to required slots custom_file: Path to custom times file allocation_file: Path to allocation file past_history: Past observation history output_directory: Directory for output files run_weather_loss: Whether to run weather loss simulation run_band3: Whether to run band 3 (used for not peforming the is_observble step for the football plot) observatory_string: Observatory name/location string request_frame: DataFrame containing request information """ # parameters self.semester_start_date = semester_start_date self.semester_length = semester_length self.slot_size = slot_size self.current_day = current_day self.all_dates_dict = all_dates_dict self.all_dates_array = all_dates_array self.n_nights_in_semester = n_nights_in_semester self.today_starting_night = today_starting_night self.slots_needed_for_exposure_dict = slots_needed_for_exposure_dict self.run_weather_loss = run_weather_loss self.run_band3 = run_band3 # files self.custom_file = custom_file self.allocation_file = allocation_file self.past_history = past_history self.output_directory = output_directory self.request_frame = request_frame # Prepatory work self.start_date = Time(self.semester_start_date,format='iso',scale='utc') self.ntargets = len(self.request_frame) self.nnights = self.semester_length # total nights in the full semester self.nslots = int((24*60)/self.slot_size) # slots in the night self.slot_size_time = TimeDelta(self.slot_size*u.min) self.observatory = apl.Observer.at_site(observatory_string) coords = apy.coordinates.SkyCoord(self.request_frame.ra * u.deg, self.request_frame.dec * u.deg, frame='icrs') self.targets = apl.FixedTarget(name=self.request_frame.unique_id, coord=coords) # Set up time grid for one night, first night of the semester self.daily_start = Time(self.start_date, location=self.observatory.location) self.daily_end = self.daily_start + TimeDelta(1.0, format='jd') # full day from start of first night self.timegrid = Time(np.arange(self.daily_start.jd, self.daily_end.jd, self.slot_size_time.jd), format='jd',location=self.observatory.location) self.timegrid = self.timegrid[np.argsort(self.timegrid.sidereal_time('mean'))] # sort by lst # computing slot midpoint for all nights in semester 2D array (slots, nights) self.slotmidpoints_oneday = self.daily_start + (np.arange(self.nslots) + 0.5) * self.slot_size * u.min days = np.arange(self.nnights) * u.day self.slotmidpoints = (self.slotmidpoints_oneday[np.newaxis,:] + days[:,np.newaxis]) self.DATADIR = DATADIR
[docs] def compute_altaz(self, tel_min): """ Compute boolean mask of is_altaz for targets according to a minimum elevation. May be superceded by a specific compute_altaz method for a specific observatory, see astroq/queue/ modules. Args: tel_min (float): the minimum elevation for the telescope Returns: is_altaz (array): boolean mask of is_altaz for targets """ # Compute base alt/az pattern, shape = (ntargets, nslots) altazes = self.observatory.altaz(self.timegrid, self.targets, grid_times_targets=True) alts = altazes.alt.deg min_elevation = self.request_frame['minimum_elevation'].values # Get PI-desired minimum elevation values min_elevation = np.maximum(min_elevation, tel_min) # Ensure minimum elevation is at least tel_min # Pre-compute the sidereal times for interpolation x = self.timegrid.sidereal_time('mean').value x_new = self.slotmidpoints.sidereal_time('mean').value idx = np.searchsorted(x, x_new, side='left') idx = np.clip(idx, 0, len(x)-1) # Handle edge cases is_altaz0 = np.ones_like(alts, dtype=bool) fail = (alts < tel_min) is_altaz0 &= ~fail # self.is_altaz = np.empty((self.ntargets, self.nnights, self.nslots),dtype=bool) self.is_altaz = is_altaz0[:,idx]
[docs] def compute_future(self): """ Compute boolean mask of is_future for all targets according to today's current_day. Args: Returns: is_altaz (array): boolean mask of is_altaz for targets """ self.is_future = np.ones((self.ntargets, self.nnights, self.nslots),dtype=bool) today_daynumber = self.all_dates_dict[self.current_day] self.is_future[:,:today_daynumber,:] = False
[docs] def compute_moon(self): """ Compute boolean mask of is_moon for all targets according to the moon's position. """ self.is_moon = np.ones_like(self.is_altaz, dtype=bool) moon = apy.coordinates.get_moon(self.slotmidpoints[:,0] , self.observatory.location) # Reshaping uses broadcasting to achieve a (ntarget, night) array ang_dist = apy.coordinates.angular_separation( self.targets.ra.reshape(-1,1), self.targets.dec.reshape(-1,1), moon.ra.reshape(1,-1), moon.dec.reshape(1,-1), ) # Use per-row minimum_moon_separation values instead of hardcoded 30 degrees min_moon_sep = self.request_frame['minimum_moon_separation'].values * u.deg # Convert to degrees self.is_moon = self.is_moon & (ang_dist.to(u.deg) > min_moon_sep[:, np.newaxis])[:, :, np.newaxis]
[docs] def compute_inter(self): """ Compute boolean mask of is_inter for all targets according to the internight cadence. """ # Set to False if internight cadence is violated self.is_inter = np.ones((self.ntargets, self.nnights, self.nslots),dtype=bool) for itarget in range(self.ntargets): name = self.request_frame.iloc[itarget]['unique_id'] if name in self.past_history and self.request_frame.iloc[itarget]['tau_inter'] > 1: inight_start = self.all_dates_dict[self.past_history[name].date_last_observed] inight_stop = min(inight_start + self.request_frame.iloc[itarget]['tau_inter'],self.nnights) self.is_inter[itarget,inight_start:inight_stop,:] = False
[docs] def compute_custom(self): """ Compute boolean mask of is_custom for all targets according to the custom times. """ self.is_custom = np.ones((self.ntargets, self.nnights, self.nslots), dtype=bool) # Handle case where custom file doesn't exist if os.path.exists(self.custom_file): custom_times_frame = pd.read_csv(self.custom_file) # Check if the file has any data rows (not just header) if len(custom_times_frame) > 0: starid_to_index = {name: idx for idx, name in enumerate(self.request_frame['unique_id'])} custom_times_frame['start'] = custom_times_frame['start'].apply(Time) custom_times_frame['stop'] = custom_times_frame['stop'].apply(Time) for _, row in custom_times_frame.iterrows(): starid = row['unique_id'] # Skip if the star is not in the current requests frame if starid not in starid_to_index: #print(f"Warning: Star {row['starname']} with unique_id '{starid}' in custom times file not found in requests frame, skipping") continue mask = (self.slotmidpoints >= row['start']) & (self.slotmidpoints <= row['stop']) star_ind = starid_to_index[starid] current_map = self.is_custom[star_ind] if np.all(current_map): # If all ones, first interval: restrict with AND self.is_custom[star_ind] = mask else: # Otherwise, union with OR self.is_custom[star_ind] = current_map | mask else: print(f"Custom times file not found: {self.custom_file}. Using no custom constraints.")
[docs] def compute_allocated(self): """ Compute boolean mask of is_allocated for all targets according to the allocated times. """ allocated_times_frame = pd.read_csv(self.allocation_file) allocated_times_frame['start'] = allocated_times_frame['start'].apply(Time) allocated_times_frame['stop'] = allocated_times_frame['stop'].apply(Time) allocated_times_map = [] allocated_mask = np.zeros((self.nnights, self.nslots), dtype=bool) for i in range(len(allocated_times_frame)): start_time = allocated_times_frame['start'].iloc[i] stop_time = allocated_times_frame['stop'].iloc[i] mask = (self.slotmidpoints >= start_time) & (self.slotmidpoints <= stop_time) allocated_mask |= mask self.is_allocated_mask = allocated_mask self.is_allocated = np.ones_like(self.is_altaz, dtype=bool) & self.is_allocated_mask[np.newaxis,:,:] # shape = (ntargets, nnights, nslots)
[docs] def compute_clear(self,weather_loss_file=None): """ Compute boolean mask of is_clear for all targets according to the clear times. Args: weather_loss_file: Path to file with weather loss statistics information """ self.is_clear = np.ones_like(self.is_altaz, dtype=bool) if self.run_weather_loss: if weather_loss_file is None: raise ValueError("weather_loss_file is required when run_weather_loss is True") logs.info("Running weather loss model.") self.get_loss_stats(weather_loss_file) self.is_clear = self.simulate_weather_losses(covariance=0.14) self.is_clear = np.tile(self.is_clear[np.newaxis, :, :], (self.ntargets, 1, 1)) else: logs.info("Pretending weather is always clear!") self.is_clear = np.ones((self.ntargets, self.nnights, self.nslots), dtype=bool)
[docs] def produce_ultimate_map(self, running_backup_stars=False): """ Compute boolean mask of is_observable for all targets according to the ultimate map. """ self.compute_altaz(33) self.compute_future() self.compute_moon() self.compute_custom() self.compute_inter() self.compute_allocated() self.compute_clear() self.is_observable_now = np.logical_and.reduce([ self.is_altaz, self.is_future, self.is_moon, self.is_custom, self.is_inter, self.is_allocated, self.is_clear, ]) # the target does not violate any of the observability limits in that specific slot, but # it does not mean it can be started at the slot. retroactively grow mask to accomodate multishot exposures. # Is observable now, self.is_observable = self.is_observable_now.copy() if running_backup_stars == False: for itarget in range(self.ntargets): e_val = self.slots_needed_for_exposure_dict[self.request_frame.iloc[itarget]['unique_id']] if e_val == 1: continue for shift in range(1, e_val): # shifts the is_observable_now array to the left by shift # for is_observable to be true, it must be true for all shifts self.is_observable[itarget, :, :-shift] &= self.is_observable_now[itarget, :, shift:] access = { 'is_altaz': self.is_altaz, 'is_future': self.is_future, 'is_moon': self.is_moon, 'is_custom':self.is_custom, 'is_inter': self.is_inter, 'is_alloc': self.is_allocated, 'is_clear': self.is_clear, 'is_observable_now': self.is_observable_now, 'is_observable': self.is_observable } access_record = np.rec.fromarrays(list(access.values()), names=list(access.keys())) return access_record
[docs] def observability(self, requests_frame, access=None): """ Extract a dictionary of the available indices from the record array returned by produce_ultimate_map Args: requests_frame: DataFrame containing request information access: Optional record array from produce_ultimate_map (if None, this function will compute it) Returns: df (dict): Dictionary where keys are target names and values are lists of available slots per night """ if access is None: access = self.produce_ultimate_map(requests_frame) ntargets, nnights, nslots = access.shape # specify indeces of 3D observability array itarget, inight, islot = np.mgrid[:ntargets,:nnights,:nslots] # define flat table to access maps df = pd.DataFrame( {'itarget':itarget.flatten(), 'inight':inight.flatten(), 'islot':islot.flatten()} ) df['is_observable'] = access.is_observable.flatten() df = pd.merge(requests_frame[['unique_id']].reset_index(drop=True),df,left_index=True,right_on='itarget') namemap = {'starid':'unique_id','inight':'d','islot':'s'} df = df.query('is_observable').rename(columns=namemap)[namemap.values()] return df
[docs] def get_loss_stats(self, weather_loss_file): """ Gather the loss probabilities for each night in the semester from the saved historical weather data. """ historical_weather_data = pd.read_csv(os.path.join(DATADIR,weather_loss_file)) loss_stats_this_semester = [] for i, item in enumerate(self.all_dates_array): ind = historical_weather_data.index[historical_weather_data['Date'] == \ self.all_dates_array[i][5:]].tolist()[0] loss_stats_this_semester.append(historical_weather_data['% Total Loss'][ind]) self.loss_stats_this_semester = loss_stats_this_semester
[docs] def simulate_weather_losses(self, covariance=0.14): """ Simulate nights totally lost to weather using historical data Args: covariance (float): the added percent chance that tomorrow will be lost if today is lost Returns: is_clear (array): Trues represent clear nights, Falses represent weathered nights """ previous_day_was_lost = False is_clear = np.ones((self.semester_length, int((24*60)/ self.slot_size)),dtype=bool) for i in range(len(self.loss_stats_this_semester)): value_to_beat = self.loss_stats_this_semester[i] if previous_day_was_lost: value_to_beat += covariance roll_the_dice = np.random.uniform(0.0,1.0) if roll_the_dice < value_to_beat: # the night is simulated a total loss is_clear[i] = np.zeros(is_clear.shape[1]) # Set all slots to False previous_day_was_lost = True else: previous_day_was_lost = False logs.info(f"Total nights simulated as weathered out: {np.sum(~np.any(is_clear, axis=1))} of {len(is_clear)} nights remaining.") return is_clear
[docs] def build_twilight_allocation_file(semester_planner): """ Build an allocation.csv file where every night of the semester is allocated from evening to morning 12-degree twilight times. This is used exclusively by the football plot in the webapp. Args: semester_planner (SemesterPlanner): a semester planner object from splan.py Returns: twilight_file (str): Path to the created allocation.csv file """ # Create the filename based on semester semester = semester_planner.semester_start_date[:4] + semester_planner.semester_letter data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data') twilight_file = os.path.join(data_dir, f"{semester}_twilights.csv") # Check if file already exists if os.path.exists(twilight_file): return twilight_file # Create data directory if it doesn't exist os.makedirs(data_dir, exist_ok=True) # Get observatory location observatory = apl.Observer.at_site(semester_planner.observatory) # Create allocation data allocation_data = [] for date_str in semester_planner.all_dates_dict.keys(): # Parse the date date = Time(date_str, format='iso', scale='utc') # Get 12-degree twilight times for this night evening_12 = observatory.twilight_evening_nautical(date, which='next') morning_12 = observatory.twilight_morning_nautical(date, which='next') # Add to allocation data allocation_data.append({ 'start': evening_12.isot, 'stop': morning_12.isot }) # Create DataFrame and save to CSV twilight_df = pd.DataFrame(allocation_data) twilight_df.to_csv(twilight_file, index=False) return twilight_file