"""
IXPE Data Download Module.
This module provides functions to download IXPE observation data files
from remote servers. It supports both XPQLT-processed files and original
compressed files from the IXPE archive.
"""
import os
import json
import requests
from tqdm import tqdm
from ixpepy import IXPE_DATA
from ixpepy.utils.download_xpqlt_db import download_obsid_db
[docs]
def download_files_from_url(url, destination, filenames, chunk_size=1024*10):
"""Download multiple files from a URL to a destination directory.
Parameters
----------
url : str
Base URL where the files are located (should end with /)
destination : str
Local directory path where files will be saved
filenames : list of str
List of filenames to download from the URL
chunk_size : int, optional
Size of chunks for streaming download (default: 10KB)
Returns
-------
list of str
List of local file paths for the downloaded files
"""
# Ensure destination directory exists
if not os.path.exists(destination):
os.makedirs(destination, exist_ok=True)
# Ensure URL ends with /
if not url.endswith('/'):
url += '/'
downloaded_files = []
for filename in filenames:
# Build full remote URL
remote_url = url + filename
local_path = os.path.join(destination, filename)
print(f"Downloading {filename} from {url}...")
try:
# Stream the file download
response = requests.get(remote_url, stream=True)
response.raise_for_status() # Raise error for bad status codes
# Get file size for progress bar
file_size = int(response.headers.get('Content-Length', 0))
# Check if file already exists with correct size
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if local_size == file_size and file_size > 0:
print(f" {filename} already exists, skipping download")
downloaded_files.append(local_path)
continue
# Download with progress bar
with open(local_path, 'wb') as f:
with tqdm(
total=file_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
desc=f" {filename}"
) as pbar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive chunks
f.write(chunk)
pbar.update(len(chunk))
downloaded_files.append(local_path)
print(f" {filename} downloaded successfully")
except requests.exceptions.RequestException as e:
print(f" Error downloading {filename}: {e}")
# Continue with other files even if one fails
continue
return downloaded_files
[docs]
def download_obsid(obs_id, destination=None, xpqlt=True, update_db=True):
"""
Download IXPE observation data files for a given observation ID.
This function retrieves event files for all three IXPE detector units (DU1, DU2, DU3)
from the XPQLT server or the original IXPE archive.
Parameters
----------
obs_id : str or int
IXPE observation ID (e.g., "01001099" or 1001099)
destination : str, optional
Local directory where files should be saved. If None, saves to
IXPE_DATA/0{obs_id}. Default is None.
xpqlt : bool, optional
If True, download XPQLT-processed FITS files. If False, download
original compressed files from the IXPE archive. Default is True.
update_db : bool, optional
If True, update the observation database before downloading.
If False, use cached database. Default is True.
Returns
-------
None
Raises
------
RuntimeError
If the observation ID is not found in the database
Notes
-----
XPQLT-processed files are recommended for most analyses as they include
calibrated data products. Original files may be needed for specialized
reprocessing.
Examples
--------
Download XPQLT-processed files for observation 01001099:
>>> download_obsid("01001099")
Download original compressed files to a custom directory:
>>> download_obsid("01001099", destination="/path/to/data", xpqlt=False)
"""
obsid_db_file = download_obsid_db(update=update_db)
with open(obsid_db_file) as f:
obsid_db = json.load(f)
obs_id = str(int(obs_id))
try:
obs = obsid_db[obs_id]
except KeyError:
raise RuntimeError("Obs ID %s not available!" % obs_id)
if xpqlt:
url_base = os.path.dirname(obs["xpqlt_url_fits_du1"]) + '/'
filenames = [os.path.basename(obs["xpqlt_url_fits_du%d" % i]) \
for i in [1,2,3]]
else:
url_base = os.path.dirname(obs["origin_url_gz_du1"])
filenames = [os.path.basename(obs["origin_url_gz_du%d" % i]) \
for i in [1,2,3]]
if destination is None:
destination = os.path.join(IXPE_DATA, f"0{obs_id}")
download_files_from_url(url_base, destination, filenames)
[docs]
def download_source(source_name, destination=None, xpqlt=True, update_db=True):
"""
Download IXPE observation data files for all observations of a given source.
This function retrieves event files for all observations of a specified source
from the XPQLT server or the original IXPE archive. It downloads data for all
three IXPE detector units (DU1, DU2, DU3) for each observation ID associated
with the source.
Parameters
----------
source_name : str
Name of the astronomical source (e.g., "Crab", "Her X-1", "Cyg X-1")
destination : str, optional
Base directory where files should be saved. If None, saves each obsid to
IXPE_DATA/{source_name}/0{obs_id}. Default is None.
xpqlt : bool, optional
If True, download XPQLT-processed FITS files. If False, download
original compressed files from the IXPE archive. Default is True.
update_db : bool, optional
If True, update the source observation database before downloading.
If False, use cached database. Default is True.
Returns
-------
list of str
List of observation IDs that were downloaded
Raises
------
RuntimeError
If the source name is not found in the database
Notes
-----
Files are organized in subdirectories by observation ID. If multiple
observations exist for the same source, all will be downloaded.
Examples
--------
Download all XPQLT-processed observations for the Crab Nebula:
>>> from ixpepy.io.download_ixpe_data import download_source
>>> obsids = download_source("Crab")
Found 2 observation(s) for source 'Crab'
Source type: PWN
RA: 83.633, Dec: 22.015
Extended: True
>>> print(f"Downloaded {len(obsids)} observations: {obsids}")
Downloaded 2 observations: ['1001099', '1001199']
Download original compressed files for Her X-1 to a custom directory:
>>> obsids = download_source("Her X-1", destination="/data/ixpe", xpqlt=False)
Download without updating the database (faster if recently updated):
>>> obsids = download_source("Cyg X-1", update_db=False)
"""
from ixpepy.utils.download_xpqlt_db import download_source_obsid_db
# Download/update the source-obsid mapping database
source_db_file = download_source_obsid_db(update=update_db)
with open(source_db_file) as f:
source_db = json.load(f)
# Check if source exists in database
if source_name not in source_db:
raise RuntimeError(f"Source '{source_name}' not available in database!")
source_info = source_db[source_name]
obsids = source_info.get('obsids', {})
if not obsids:
raise RuntimeError(f"No observations found for source '{source_name}'!")
print(f"Found {len(obsids)} observation(s) for source '{source_name}'")
print(f" Source type: {source_info.get('srctype', 'Unknown')}")
print(f" RA: {source_info.get('ra', 'N/A')}, Dec: {source_info.get('dec', 'N/A')}")
print(f" Extended: {source_info.get('is_extended', False)}")
print("")
# Download data for each observation ID
downloaded_obsids = []
for i, obs_id in enumerate(obsids.keys(), 1):
print(f"Downloading observation {i}/{len(obsids)}: {obs_id}")
# Set destination for this obsid
if destination is None:
obsid_dest = os.path.join(IXPE_DATA, source_name.replace(" ", "_"), f"0{obs_id}")
else:
obsid_dest = os.path.join(destination, f"0{obs_id}")
try:
# Use the existing download_obsid function for each observation
download_obsid(obs_id, destination=obsid_dest, xpqlt=xpqlt, update_db=False)
downloaded_obsids.append(obs_id)
print(f" ✓ Observation {obs_id} downloaded successfully")
except Exception as e:
print(f" ✗ Error downloading observation {obs_id}: {e}")
continue
print("")
print(f"Download complete: {len(downloaded_obsids)}/{len(obsids)} observations successful")
return downloaded_obsids
if __name__ == "__main__":
download_obsid("01001099", destination=None, xpqlt=True, update_db= True)