Source code for utdquake.core.load

from __future__ import annotations

import logging
import shutil
from pathlib import Path
from typing import Dict, Set

import obsplus
import pyarrow.parquet as pq
from .data import download_snapshot
from .config import get_root, get_utdq_paths

logger = logging.getLogger(__name__)

[docs] def validate_eventbank(path: Path) -> bool: """ Validate that a path contains a readable ObsPlus EventBank. Parameters ---------- path : Path Path to the EventBank directory. Returns ------- bool True if the EventBank exists and is readable, False otherwise. """ if not path.exists(): return False try: bank = obsplus.EventBank(str(path)) index = bank.read_index() event_id = bank.read_index().event_id.iloc[0] event = bank.get_events(event_id=event_id) return True except Exception: return False
[docs] def validate_parquet(path: Path) -> bool: """ Validate that a path contains a valid Parquet file. Parameters ---------- path : Path Path to the Parquet file. Returns ------- bool True if the file exists and can be read by PyArrow, False otherwise. """ if not path.exists(): return False try: pq.ParquetFile(path) return True except Exception: return False
[docs] def resolve_missing_components( bank_path: Path, parquets: Dict[str, Path], flags: Dict[str, bool], include_bank: bool ) -> Set[str]: """ Determine which components are missing or invalid locally. Parameters ---------- bank_path : Path Path to the EventBank directory. parquets : dict Dictionary of Parquet file paths for keys 'events', 'stations', 'picks'. flags : dict Dictionary indicating which components to check. include_bank : bool Whether to include bank validation. Returns ------- set Set of missing components (keys) that need to be downloaded. """ missing = set() for key, path in parquets.items(): if flags[key] and not validate_parquet(path): missing.add(key) if include_bank: # DEBUG HERE logger.debug("bank_path = %s", bank_path) logger.debug("bank_path exists = %s", bank_path.exists()) if bank_path.exists(): logger.debug("bank_path contents = %s", list(bank_path.iterdir())) if not validate_eventbank(bank_path): missing.add("banks") return missing
[docs] def cleanup_components(to_download: Set[str], bank_path: Path, parquets: Dict[str, Path]) -> None: """ Remove local files/directories for components that will be re-downloaded. Parameters ---------- to_download : set Components to remove ('banks', 'events', 'stations', 'picks'). bank_path : Path Path to the EventBank directory. parquets : dict Dictionary of Parquet file paths. """ if "banks" in to_download: shutil.rmtree(bank_path, ignore_errors=True) for key in ("events", "stations", "picks"): if key in to_download: path = parquets[key] if path.exists(): if path.is_dir(): shutil.rmtree(path) else: path.unlink()
[docs] def resolve_network_paths( network: str, das: bool = False, include_bank: bool = True, include_events: bool = True, include_stations: bool = True, include_picks: bool = True, include_travel_time: bool = False, max_retries: int = 3, ) -> Dict[str, Path]: """ Ensure that all network data components exist locally and return their paths. If any required components are missing or invalid, attempts to download them using `download_snapshot`. Raises RuntimeError if unable to resolve after retries. Parameters ---------- network : str Network code to resolve. das : bool, optional If True, resolves paths from the DAS dataset. Default is False (standard paths). include_bank : bool, optional Whether to include EventBank. Default is True. include_events : bool, optional Whether to include event Parquet. Default is True. include_stations : bool, optional Whether to include stations Parquet. Default is True. include_picks : bool, optional Whether to include picks Parquet. Default is True. include_travel_time : bool, optional Whether to include travel time model. Default is False (not currently implemented for DAS). max_retries : int, optional Maximum number of download attempts. Default is 2. Returns ------- dict Dictionary of component paths. Keys include 'bank', 'events', 'stations', 'picks'. Raises ------ RuntimeError If network data could not be resolved after `max_retries` attempts. """ network = network.strip() # Paths for EventBank and Parquet components utdq_paths = get_utdq_paths(network,das) bank_path = utdq_paths["banks"] parquets = { "events": utdq_paths["events"], "stations": utdq_paths["stations"], "picks": utdq_paths["picks"], "travel_time": utdq_paths[".utdquake/travel_time"], } flags = { "events": include_events, "stations": include_stations, "picks": include_picks, "travel_time": include_travel_time, } for attempt in range(max_retries): missing = resolve_missing_components( bank_path, parquets, flags, include_bank ) if not missing: return { **({"bank": bank_path} if include_bank else {}), **{k: v for k, v in parquets.items() if flags[k]}, } logger.info( "Resolving network '%s' (attempt %d/%d). Missing: %s", network, attempt + 1, max_retries, missing ) # Remove invalid/missing components before redownloading cleanup_components(missing, bank_path, parquets) # Download missing components download_snapshot( local_dir=get_root(das), networks=network, das=das, include_banks="banks" in missing, include_events="events" in missing, include_stations="stations" in missing, include_picks="picks" in missing, include_travel_time="travel_time" in missing, unzip_banks=True, ) raise RuntimeError( f"Could not resolve network '{network}' after {max_retries} attempts" )