Source code for ixpepy.io.download_ixpe_data

"""
IXPE Data Download Module.

This module provides functions to download IXPE observation data files
from remote servers. It supports both XPQLT-processed files and original
compressed files from the IXPE archive.
"""

import os
import json

import requests
from tqdm import tqdm

from ixpepy import IXPE_DATA
from ixpepy.utils.download_xpqlt_db import download_obsid_db


[docs] def download_files_from_url(url, destination, filenames, chunk_size=1024*10): """Download multiple files from a URL to a destination directory. Parameters ---------- url : str Base URL where the files are located (should end with /) destination : str Local directory path where files will be saved filenames : list of str List of filenames to download from the URL chunk_size : int, optional Size of chunks for streaming download (default: 10KB) Returns ------- list of str List of local file paths for the downloaded files """ # Ensure destination directory exists if not os.path.exists(destination): os.makedirs(destination, exist_ok=True) # Ensure URL ends with / if not url.endswith('/'): url += '/' downloaded_files = [] for filename in filenames: # Build full remote URL remote_url = url + filename local_path = os.path.join(destination, filename) print(f"Downloading {filename} from {url}...") try: # Stream the file download response = requests.get(remote_url, stream=True) response.raise_for_status() # Raise error for bad status codes # Get file size for progress bar file_size = int(response.headers.get('Content-Length', 0)) # Check if file already exists with correct size if os.path.exists(local_path): local_size = os.path.getsize(local_path) if local_size == file_size and file_size > 0: print(f" {filename} already exists, skipping download") downloaded_files.append(local_path) continue # Download with progress bar with open(local_path, 'wb') as f: with tqdm( total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc=f" {filename}" ) as pbar: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: # filter out keep-alive chunks f.write(chunk) pbar.update(len(chunk)) downloaded_files.append(local_path) print(f" {filename} downloaded successfully") except requests.exceptions.RequestException as e: print(f" Error downloading {filename}: {e}") # Continue with other files even if one fails continue return downloaded_files
[docs] def download_obsid(obs_id, destination=None, xpqlt=True, update_db=True): """ Download IXPE observation data files for a given observation ID. This function retrieves event files for all three IXPE detector units (DU1, DU2, DU3) from the XPQLT server or the original IXPE archive. Parameters ---------- obs_id : str or int IXPE observation ID (e.g., "01001099" or 1001099) destination : str, optional Local directory where files should be saved. If None, saves to IXPE_DATA/0{obs_id}. Default is None. xpqlt : bool, optional If True, download XPQLT-processed FITS files. If False, download original compressed files from the IXPE archive. Default is True. update_db : bool, optional If True, update the observation database before downloading. If False, use cached database. Default is True. Returns ------- None Raises ------ RuntimeError If the observation ID is not found in the database Notes ----- XPQLT-processed files are recommended for most analyses as they include calibrated data products. Original files may be needed for specialized reprocessing. Examples -------- Download XPQLT-processed files for observation 01001099: >>> download_obsid("01001099") Download original compressed files to a custom directory: >>> download_obsid("01001099", destination="/path/to/data", xpqlt=False) """ obsid_db_file = download_obsid_db(update=update_db) with open(obsid_db_file) as f: obsid_db = json.load(f) obs_id = str(int(obs_id)) try: obs = obsid_db[obs_id] except KeyError: raise RuntimeError("Obs ID %s not available!" % obs_id) if xpqlt: url_base = os.path.dirname(obs["xpqlt_url_fits_du1"]) + '/' filenames = [os.path.basename(obs["xpqlt_url_fits_du%d" % i]) \ for i in [1,2,3]] else: url_base = os.path.dirname(obs["origin_url_gz_du1"]) filenames = [os.path.basename(obs["origin_url_gz_du%d" % i]) \ for i in [1,2,3]] if destination is None: destination = os.path.join(IXPE_DATA, f"0{obs_id}") download_files_from_url(url_base, destination, filenames)
[docs] def download_source(source_name, destination=None, xpqlt=True, update_db=True): """ Download IXPE observation data files for all observations of a given source. This function retrieves event files for all observations of a specified source from the XPQLT server or the original IXPE archive. It downloads data for all three IXPE detector units (DU1, DU2, DU3) for each observation ID associated with the source. Parameters ---------- source_name : str Name of the astronomical source (e.g., "Crab", "Her X-1", "Cyg X-1") destination : str, optional Base directory where files should be saved. If None, saves each obsid to IXPE_DATA/{source_name}/0{obs_id}. Default is None. xpqlt : bool, optional If True, download XPQLT-processed FITS files. If False, download original compressed files from the IXPE archive. Default is True. update_db : bool, optional If True, update the source observation database before downloading. If False, use cached database. Default is True. Returns ------- list of str List of observation IDs that were downloaded Raises ------ RuntimeError If the source name is not found in the database Notes ----- Files are organized in subdirectories by observation ID. If multiple observations exist for the same source, all will be downloaded. Examples -------- Download all XPQLT-processed observations for the Crab Nebula: >>> from ixpepy.io.download_ixpe_data import download_source >>> obsids = download_source("Crab") Found 2 observation(s) for source 'Crab' Source type: PWN RA: 83.633, Dec: 22.015 Extended: True >>> print(f"Downloaded {len(obsids)} observations: {obsids}") Downloaded 2 observations: ['1001099', '1001199'] Download original compressed files for Her X-1 to a custom directory: >>> obsids = download_source("Her X-1", destination="/data/ixpe", xpqlt=False) Download without updating the database (faster if recently updated): >>> obsids = download_source("Cyg X-1", update_db=False) """ from ixpepy.utils.download_xpqlt_db import download_source_obsid_db # Download/update the source-obsid mapping database source_db_file = download_source_obsid_db(update=update_db) with open(source_db_file) as f: source_db = json.load(f) # Check if source exists in database if source_name not in source_db: raise RuntimeError(f"Source '{source_name}' not available in database!") source_info = source_db[source_name] obsids = source_info.get('obsids', {}) if not obsids: raise RuntimeError(f"No observations found for source '{source_name}'!") print(f"Found {len(obsids)} observation(s) for source '{source_name}'") print(f" Source type: {source_info.get('srctype', 'Unknown')}") print(f" RA: {source_info.get('ra', 'N/A')}, Dec: {source_info.get('dec', 'N/A')}") print(f" Extended: {source_info.get('is_extended', False)}") print("") # Download data for each observation ID downloaded_obsids = [] for i, obs_id in enumerate(obsids.keys(), 1): print(f"Downloading observation {i}/{len(obsids)}: {obs_id}") # Set destination for this obsid if destination is None: obsid_dest = os.path.join(IXPE_DATA, source_name.replace(" ", "_"), f"0{obs_id}") else: obsid_dest = os.path.join(destination, f"0{obs_id}") try: # Use the existing download_obsid function for each observation download_obsid(obs_id, destination=obsid_dest, xpqlt=xpqlt, update_db=False) downloaded_obsids.append(obs_id) print(f" ✓ Observation {obs_id} downloaded successfully") except Exception as e: print(f" ✗ Error downloading observation {obs_id}: {e}") continue print("") print(f"Download complete: {len(downloaded_obsids)}/{len(obsids)} observations successful") return downloaded_obsids
if __name__ == "__main__": download_obsid("01001099", destination=None, xpqlt=True, update_db= True)