Single File Post-Processing¶
by Josh Dillon, last updated April 24, 2024
This notebook, the final step in a pre-LST-binning analysis, applies calibration solutions and flags to both sum and diff data, producing a variety of data products. These currently may include:
Abs-calibrated, redundantly-averaged visibility sums
Abs-calibrated, redundantly-averaged visibility diffs
Smooth-calibrated, redundantly-averaged visibility sums
Smooth-calibrated, redundantly-averaged visibility diffs
Abs-calibrated, redundantly-averaged, delay-filtered visibility sums
Abs-calibrated, redundantly-averaged, delay-filtered visibility diffs
Smooth-calibrated, redundantly-averaged, delay-filtered visibility sums
Smooth-calibrated, redundantly-averaged, delay-filtered visibility diffs
Smooth-calibrated, coherently redundantly-averaged, incoherently array-averaged visibility magnitudes
- This is done for all visibilities, just autos, and just cross-correlations and is designed to be lightweight data product for transient searches
Some of these data products are experimental and may be removed at a later date to save compute and disk. Some may be turned off using environment variables.
Here's a set of links to skip to particular figures and tables:
• Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal
¶
• Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶
• Figure 3: Delay-Filtered Visibility Delay Spectra after smooth_cal
and Redundant-Averaging¶
import time
tstart = time.time()
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin # REQUIRED to have the compression plugins available
import numpy as np
import copy
import pickle
import glob
from pyuvdata import UVFlag
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from hera_qm.metrics_io import read_a_priori_ant_flags
from hera_qm.time_series_metrics import true_stretches
from scipy import constants, signal
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline
Load data and calibration solutions¶
# figure out whether to save results (and which ones)
SAVE_RESULTS = os.environ.get("SAVE_RESULTS", "TRUE").upper() == "TRUE"
SAVE_DIFF_RED_AVG = os.environ.get("SAVE_DIFF_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_ABS_CAL_RED_AVG = os.environ.get("SAVE_ABS_CAL_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_DLY_FILT_RED_AVG = os.environ.get("SAVE_DLY_FILT_RED_AVG", "TRUE").upper() == "TRUE"
# TODO: in theory, if some of these are turned off, some memory and/or processing could be saved
# by doing only a subset of what this notebook does. That's left for fuure work.
add_to_history = 'Produced by file_prostprocessing notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
for setting in ['SAVE_RESULTS', 'SAVE_DIFF_RED_AVG', 'SAVE_ABS_CAL_RED_AVG', 'SAVE_DLY_FILT_RED_AVG']:
print(f'{setting} = {eval(setting)}')
SAVE_RESULTS = True SAVE_DIFF_RED_AVG = False SAVE_ABS_CAL_RED_AVG = False SAVE_DLY_FILT_RED_AVG = False
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.46123.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
DIFF_SUFFIX = os.environ.get("DIFF_SUFFIX", 'diff.uvh5')
# get input calibration files and flags
ABS_CAL_SUFFIX = os.environ.get("ABS_CAL_SUFFIX", 'sum.omni.calfits')
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
APOSTERIORI_YAML_SUFFIX = os.environ.get("APOSTERIORI_YAML_SUFFIX", '_aposteriori_flags.yaml')
aposteriori_yaml_file = os.path.join(os.path.dirname(SUM_FILE), SUM_FILE.split('.')[-4] + APOSTERIORI_YAML_SUFFIX)
# Get filter cache name
FILTER_CACHE = os.environ.get("FILTER_CACHE", "filter_cache")
filter_cache_dir = os.path.join(os.path.dirname(SUM_FILE), FILTER_CACHE)
# output abs-calibrated, redundantly-averaged files
SUM_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_SUFFIX", 'sum.abs_calibrated.red_avg.uvh5')
DIFF_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_SUFFIX", 'diff.abs_calibrated.red_avg.uvh5')
# output smooth-calibrated, redundantly-averaged files
SUM_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_SUFFIX", 'sum.smooth_calibrated.red_avg.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_SUFFIX", 'diff.smooth_calibrated.red_avg.uvh5')
# output abs-calibrated, redundantly-averaged, delay-filtered files
SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.abs_calibrated.red_avg.dly_filt.uvh5')
DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.abs_calibrated.red_avg.dly_filt.uvh5')
# output smooth-calibrated, redundantly-averaged, delay-filtered files
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.smooth_calibrated.red_avg.dly_filt.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.smooth_calibrated.red_avg.dly_filt.uvh5')
# output smooth-calibrated, averaged absolute value of visibilities
AVG_ABS_ALL_SUFFIX = os.environ.get("AVG_ABS_SUFFIX", 'sum.smooth_calibrated.avg_abs_all.uvh5')
AVG_ABS_AUTO_SUFFIX = os.environ.get("AVG_ABS_AUTO_SUFFIX", 'sum.smooth_calibrated.avg_abs_auto.uvh5')
AVG_ABS_CROSS_SUFFIX = os.environ.get("AVG_ABS_CROSS_SUFFIX", 'sum.smooth_calibrated.avg_abs_cross.uvh5')
for suffix in ['DIFF_SUFFIX', 'ABS_CAL_SUFFIX', 'SMOOTH_CAL_SUFFIX',
'SUM_ABS_CAL_RED_AVG_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_SUFFIX',
'SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX',
'AVG_ABS_ALL_SUFFIX', 'AVG_ABS_AUTO_SUFFIX', 'AVG_ABS_CROSS_SUFFIX']:
file_var = suffix.replace('SUFFIX', 'FILE')
exec(f'{file_var} = SUM_FILE.replace(SUM_SUFFIX, {suffix})')
print(f"{file_var} = '{eval(file_var)}'")
print(f'aposteriori_yaml_file = {aposteriori_yaml_file}')
print(f'filter_cache = {filter_cache_dir}')
DIFF_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.diff.uvh5' ABS_CAL_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.omni.calfits' SMOOTH_CAL_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth.calfits' SUM_ABS_CAL_RED_AVG_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.abs_calibrated.red_avg.uvh5' DIFF_ABS_CAL_RED_AVG_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.diff.abs_calibrated.red_avg.uvh5' SUM_SMOOTH_CAL_RED_AVG_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth_calibrated.red_avg.uvh5' DIFF_SMOOTH_CAL_RED_AVG_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.diff.smooth_calibrated.red_avg.uvh5' SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.abs_calibrated.red_avg.dly_filt.uvh5' DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.diff.abs_calibrated.red_avg.dly_filt.uvh5' SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth_calibrated.red_avg.dly_filt.uvh5' DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.diff.smooth_calibrated.red_avg.dly_filt.uvh5' AVG_ABS_ALL_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth_calibrated.avg_abs_all.uvh5' AVG_ABS_AUTO_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth_calibrated.avg_abs_auto.uvh5' AVG_ABS_CROSS_FILE = '/mnt/sn1/data1/2460447/zen.2460447.34456.sum.smooth_calibrated.avg_abs_cross.uvh5' aposteriori_yaml_file = /mnt/sn1/data1/2460447/2460447_aposteriori_flags.yaml filter_cache = /mnt/sn1/data1/2460447/filter_cache
# parse delay filtering settings
DLY_FILT_HORIZON = float(os.environ.get("DLY_FILT_HORIZON", 1.0))
DLY_FILT_STANDOFF = float(os.environ.get("DLY_FILT_STANDOFF", 0.0)) # in ns
DLY_FILT_MIN_DLY = float(os.environ.get("DLY_FILT_MIN_DLY", 150.0)) # in ns
DLY_FILT_EIGENVAL_CUTOFF = float(os.environ.get("DLY_FILT_EIGENVAL_CUTOFF", 1e-12))
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
for setting in ['DLY_FILT_HORIZON', 'DLY_FILT_STANDOFF', 'DLY_FILT_MIN_DLY', 'DLY_FILT_EIGENVAL_CUTOFF', 'FM_LOW_FREQ', 'FM_HIGH_FREQ']:
print(f'{setting} = {eval(setting)}')
DLY_FILT_HORIZON = 1.0 DLY_FILT_STANDOFF = 0.0 DLY_FILT_MIN_DLY = 150.0 DLY_FILT_EIGENVAL_CUTOFF = 1e-12 FM_LOW_FREQ = 87.5 FM_HIGH_FREQ = 108.0
# Find all cached matrices within the filter cache
cache_files = glob.glob(os.path.join(filter_cache_dir, "*.filter_cache.p"))
# Load sum and diff data
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=['ee', 'nn'])
hd_diff = io.HERADataFastReader(DIFF_FILE)
diff_data, diff_flags, diff_nsamples = hd_diff.read(pols=['ee', 'nn'])
# pick all reds that might be in the data, using the same set across polarizations for easier output
reds = redcal.get_reds(hd.data_antpos, pols=['ee', 'nn'], include_autos=True)
ex_ants = set(read_a_priori_ant_flags(aposteriori_yaml_file))
possibly_unflagged_bls = [bl for bl in data if utils.split_bl(bl)[0] not in ex_ants and utils.split_bl(bl)[1] not in ex_ants]
possibly_unflagged_antpairs = set([ap for bl in possibly_unflagged_bls for ap in [bl[:2], bl[-2::-1]]])
reds = [red for red in reds if np.any([bl[0:2] in possibly_unflagged_antpairs for bl in red])]
# Load calibration solutions and gain flags
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
hc_abs = io.HERACal(ABS_CAL_FILE)
abs_gains, abs_cal_flags, _, _ = hc_abs.read()
# handle the the case where the smooth_cal flags are all True, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
print('This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, '
'but the output data products will still be fully flagged.')
ALL_FLAGGED = True
# Likely the file was fully flagged for broadband RFI, so instead use the original flags from file_calibration
cal_flags = abs_cal_flags
# And if that didn't work, just make the flags all False for now (though ex_ants will still be exluded via reds)
if np.all([flag for flag in cal_flags.values()]):
cal_flags = {ant: np.zeros_like(cal_flags[ant]) for ant in cal_flags}
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):
# Redundantly average data
wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
| cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
sol = redcal.RedSol(reds, gains=gains)
sol.update_vis_from_data(data, wgts=wgts)
# Figure out redundantly averaged flags and nsamples
red_avg_flags = {}
red_avg_nsamples = {}
for red in reds:
if red[0] in sol.vis:
red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
else:
# empty placeholders to make sure every file has the same shape for the whole day
sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
sol.make_sol_finite()
# Build output RedDataContainers
red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
return red_avg_data, red_avg_flags, red_avg_nsamples
# perform redundant averaging
red_avg_smooth_cal_sum_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
red_avg_smooth_cal_diff_data, _, _ = red_average(reds, diff_data, diff_nsamples, smooth_gains, flags=diff_flags, cal_flags=cal_flags)
red_avg_abs_cal_sum_data, _, _ = red_average(reds, data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
red_avg_abs_cal_diff_data, _, _ = red_average(reds, diff_data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
if not ALL_FLAGGED:
integration_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags], axis=(0, 2))
def plot_red_avg_vis():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 2, figsize=(14, 6), dpi=100, sharex='col', sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
for i, pol in enumerate(['ee', 'nn']):
reds_here = redcal.filter_reds(reds, pols=[pol], antpos=hd.antpos, min_bl_cut=1)
sol = redcal.RedSol(reds_here, gains=smooth_gains)
red = sorted(reds_here, key=lambda red: np.median(red_avg_nsamples.get(red[0], 0)), reverse=True)[0]
if np.any([bl[2] == pol for bl in red_avg_flags]):
for tind in range(red_avg_flags[red[0]].shape[0]):
if not np.all(red_avg_flags[red[0]][tind]):
break
calibrated = {bl: np.where(flags[bl] | cal_flags[utils.split_bl(bl)[0]] | cal_flags[utils.split_bl(bl)[1]],
np.nan, sol.calibrate_bl(bl, data[bl])) for bl in red if bl in data}
for bl in red:
axes[0, i].plot(hd.freqs/1e6, np.angle(calibrated[bl][tind]), alpha=.5, lw=.5)
axes[1, i].semilogy(hd.freqs/1e6, np.abs(calibrated[bl][tind]), alpha=.5, lw=.5)
to_plot = np.where(red_avg_flags[red[0]][tind], np.nan, red_avg_smooth_cal_sum_data[red[0]][tind])
axes[0, i].plot(hd.freqs / 1e6, np.angle(to_plot), lw=1, c='k')
axes[1, i].semilogy(hd.freqs / 1e6, np.abs(to_plot), lw=1, c='k', label=f'Baseline Group:\n{red[0]}')
axes[1, i].set_xlabel('Frequency (MHz)')
axes[1, i].legend(loc='upper right')
axes[0, 0].set_ylabel('Visibility Phase (radians)')
axes[1, 0].set_ylabel('Visibility Amplitude (Jy)')
plt.tight_layout()
def plot_red_avg_nsamples():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 1, figsize=(14,7), dpi=100, sharex=True, gridspec_kw={'hspace': 0})
max_nsamples = np.max([np.median(ns) for ns in red_avg_nsamples.values()])
for ax, pol in zip(axes, ['ee', 'nn']):
if np.any([bl[2] == pol for bl in red_avg_nsamples]):
blvecs = np.array([hd.antpos[red[0][1]] - hd.antpos[red[0][0]] for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
med_nsamples = np.array([np.median(red_avg_nsamples[red[0]][~integration_flags, :])
for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=0)
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=(100 * (600 / np.diff(ax.get_xlim())[0])**2), ec='k', linewidths=.5,
c=med_nsamples, norm=matplotlib.colors.LogNorm(vmin=1, vmax=max_nsamples), cmap='turbo')
ax.axis('equal')
ax.set_xlabel('EW Baseline Vector (m)')
ax.set_ylabel('NS Baseline Vector (m)')
ax.text(.98, .94, f'{pol}-polarized', transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
plt.tight_layout()
fig.colorbar(sca, ax=axes, pad=.02, label='Number of Samples')
Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal
¶
The results of calibration and redundant averaging the baseline group with the highest redundancy in each polarization. Visibilities with flagged antennas are excluded. The black line is the redundantly-averaged visibility. Each thin colored line is a different baseline group. Phases are shown in the top row, amplitudes in the bottom, ee-polarized visibilities in the left column, and nn-polarized visibilities in the right.
plot_red_avg_vis()
Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶
The number of visibilities averaged together in each baseline group. Note that the split of the HERA core produces more highly-sampled intra-sector baselines that are interspersed with the less-highly-sampled inter-sector baselines off the main grid.
plot_red_avg_nsamples()