Single File Post-Processing¶
by Josh Dillon, last updated July 15, 2024
This notebook, the final step in a pre-LST-binning analysis, applies calibration solutions and flags to both sum and diff data, producing a variety of data products. These currently may include:
Abs-calibrated, redundantly-averaged visibility sums
Abs-calibrated, redundantly-averaged visibility diffs
Smooth-calibrated, redundantly-averaged visibility sums
Smooth-calibrated, redundantly-averaged visibility diffs
Abs-calibrated, redundantly-averaged, delay-filtered visibility sums
Abs-calibrated, redundantly-averaged, delay-filtered visibility diffs
Smooth-calibrated, redundantly-averaged, delay-filtered visibility sums
Smooth-calibrated, redundantly-averaged, delay-filtered visibility diffs
Smooth-calibrated, coherently redundantly-averaged, incoherently array-averaged visibility magnitudes
- This is done for all visibilities, just autos, and just cross-correlations and is designed to be lightweight data product for transient searches
Some of these data products are experimental and may be removed at a later date to save compute and disk. Some may be turned off using environment variables.
Here's a set of links to skip to particular figures and tables:
• Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal
¶
• Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶
• Figure 3: Delay-Filtered Visibility Delay Spectra after smooth_cal
and Redundant-Averaging¶
import time
tstart = time.time()
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin # REQUIRED to have the compression plugins available
import numpy as np
import copy
import pickle
import glob
from pyuvdata import UVFlag
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from hera_qm.metrics_io import read_a_priori_ant_flags
from hera_qm.time_series_metrics import true_stretches
from scipy import constants, signal
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline
Load data and calibration solutions¶
# figure out whether to save results (and which ones)
SAVE_RESULTS = os.environ.get("SAVE_RESULTS", "TRUE").upper() == "TRUE"
SAVE_DIFF_RED_AVG = os.environ.get("SAVE_DIFF_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_ABS_CAL_RED_AVG = os.environ.get("SAVE_ABS_CAL_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_DLY_FILT_RED_AVG = os.environ.get("SAVE_DLY_FILT_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_CROSS_POLS = os.environ.get("SAVE_CROSS_POLS", "FALSE").upper() == "TRUE"
# TODO: in theory, if some of these are turned off, some memory and/or processing could be saved
# by doing only a subset of what this notebook does. That's left for fuure work.
add_to_history = 'Produced by file_prostprocessing notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
for setting in ['SAVE_RESULTS', 'SAVE_DIFF_RED_AVG', 'SAVE_ABS_CAL_RED_AVG', 'SAVE_DLY_FILT_RED_AVG', 'SAVE_CROSS_POLS']:
print(f'{setting} = {eval(setting)}')
SAVE_RESULTS = True SAVE_DIFF_RED_AVG = False SAVE_ABS_CAL_RED_AVG = False SAVE_DLY_FILT_RED_AVG = False SAVE_CROSS_POLS = True
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.46123.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
DIFF_SUFFIX = os.environ.get("DIFF_SUFFIX", 'diff.uvh5')
# get input calibration files and flags
ABS_CAL_SUFFIX = os.environ.get("ABS_CAL_SUFFIX", 'sum.omni.calfits')
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
APOSTERIORI_YAML_SUFFIX = os.environ.get("APOSTERIORI_YAML_SUFFIX", '_aposteriori_flags.yaml')
aposteriori_yaml_file = os.path.join(os.path.dirname(SUM_FILE), SUM_FILE.split('.')[-4] + APOSTERIORI_YAML_SUFFIX)
# Get filter cache name
FILTER_CACHE = os.environ.get("FILTER_CACHE", "filter_cache")
filter_cache_dir = os.path.join(os.path.dirname(SUM_FILE), FILTER_CACHE)
# Get output reds pickle name for storing the actual reds used in the redundant average
REDS_PICKLE_SUFFIX = os.environ.get("REDS_PICKLE_SUFFIX", 'reds_used.p')
# output abs-calibrated, redundantly-averaged files
SUM_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_SUFFIX", 'sum.abs_calibrated.red_avg.uvh5')
DIFF_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_SUFFIX", 'diff.abs_calibrated.red_avg.uvh5')
# output smooth-calibrated, redundantly-averaged files
SUM_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_SUFFIX", 'sum.smooth_calibrated.red_avg.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_SUFFIX", 'diff.smooth_calibrated.red_avg.uvh5')
# output abs-calibrated, redundantly-averaged, delay-filtered files
SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.abs_calibrated.red_avg.dly_filt.uvh5')
DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.abs_calibrated.red_avg.dly_filt.uvh5')
# output smooth-calibrated, redundantly-averaged, delay-filtered files
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.smooth_calibrated.red_avg.dly_filt.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.smooth_calibrated.red_avg.dly_filt.uvh5')
# output smooth-calibrated, averaged absolute value of visibilities
AVG_ABS_ALL_SUFFIX = os.environ.get("AVG_ABS_SUFFIX", 'sum.smooth_calibrated.avg_abs_all.uvh5')
AVG_ABS_AUTO_SUFFIX = os.environ.get("AVG_ABS_AUTO_SUFFIX", 'sum.smooth_calibrated.avg_abs_auto.uvh5')
AVG_ABS_CROSS_SUFFIX = os.environ.get("AVG_ABS_CROSS_SUFFIX", 'sum.smooth_calibrated.avg_abs_cross.uvh5')
for suffix in ['DIFF_SUFFIX', 'ABS_CAL_SUFFIX', 'SMOOTH_CAL_SUFFIX', 'REDS_PICKLE_SUFFIX',
'SUM_ABS_CAL_RED_AVG_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_SUFFIX',
'SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX',
'AVG_ABS_ALL_SUFFIX', 'AVG_ABS_AUTO_SUFFIX', 'AVG_ABS_CROSS_SUFFIX']:
file_var = suffix.replace('SUFFIX', 'FILE')
exec(f'{file_var} = SUM_FILE.replace(SUM_SUFFIX, {suffix})')
print(f"{file_var} = '{eval(file_var)}'")
print(f'aposteriori_yaml_file = {aposteriori_yaml_file}')
print(f'filter_cache = {filter_cache_dir}')
DIFF_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.diff.uvh5' ABS_CAL_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.omni.calfits' SMOOTH_CAL_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth.calfits' REDS_PICKLE_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.reds_used.p' SUM_ABS_CAL_RED_AVG_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.abs_calibrated.red_avg.uvh5' DIFF_ABS_CAL_RED_AVG_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.diff.abs_calibrated.red_avg.uvh5' SUM_SMOOTH_CAL_RED_AVG_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.red_avg.uvh5' DIFF_SMOOTH_CAL_RED_AVG_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.diff.smooth_calibrated.red_avg.uvh5' SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.abs_calibrated.red_avg.dly_filt.uvh5' DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.diff.abs_calibrated.red_avg.dly_filt.uvh5' SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.red_avg.dly_filt.uvh5' DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.diff.smooth_calibrated.red_avg.dly_filt.uvh5' AVG_ABS_ALL_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_all.uvh5' AVG_ABS_AUTO_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_auto.uvh5' AVG_ABS_CROSS_FILE = '/mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_cross.uvh5' aposteriori_yaml_file = /mnt/sn1/data2/2460626/2460626_aposteriori_flags.yaml filter_cache = /mnt/sn1/data2/2460626/filter_cache
# parse delay filtering settings
DLY_FILT_HORIZON = float(os.environ.get("DLY_FILT_HORIZON", 1.0))
DLY_FILT_STANDOFF = float(os.environ.get("DLY_FILT_STANDOFF", 0.0)) # in ns
DLY_FILT_MIN_DLY = float(os.environ.get("DLY_FILT_MIN_DLY", 150.0)) # in ns
DLY_FILT_EIGENVAL_CUTOFF = float(os.environ.get("DLY_FILT_EIGENVAL_CUTOFF", 1e-12))
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
for setting in ['DLY_FILT_HORIZON', 'DLY_FILT_STANDOFF', 'DLY_FILT_MIN_DLY', 'DLY_FILT_EIGENVAL_CUTOFF', 'FM_LOW_FREQ', 'FM_HIGH_FREQ']:
print(f'{setting} = {eval(setting)}')
DLY_FILT_HORIZON = 1.0 DLY_FILT_STANDOFF = 0.0 DLY_FILT_MIN_DLY = 150.0 DLY_FILT_EIGENVAL_CUTOFF = 1e-12 FM_LOW_FREQ = 87.5 FM_HIGH_FREQ = 108.0
# Find all cached matrices within the filter cache
cache_files = glob.glob(os.path.join(filter_cache_dir, "*.filter_cache.p"))
pols = ['nn', 'ee', 'ne', 'en'] if SAVE_CROSS_POLS else ['ee', 'nn']
# Load sum and diff data
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=pols)
hd_diff = io.HERADataFastReader(DIFF_FILE)
diff_data, diff_flags, diff_nsamples = hd_diff.read(pols=pols)
# pick all reds that might be in the data, using the same set across polarizations for easier output
reds = redcal.get_reds(hd.data_antpos, pols=pols, include_autos=True, bl_error_tol=2.0)
ex_ants = set(read_a_priori_ant_flags(aposteriori_yaml_file))
possibly_unflagged_bls = [bl for bl in data if utils.split_bl(bl)[0] not in ex_ants and utils.split_bl(bl)[1] not in ex_ants]
possibly_unflagged_antpairs = set([ap for bl in possibly_unflagged_bls for ap in [bl[:2], bl[-2::-1]]])
reds = [red for red in reds if np.any([bl[0:2] in possibly_unflagged_antpairs for bl in red])]
# Load calibration solutions and gain flags
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
pol_convention = hc_smooth.pol_convention
gain_scale = hc_smooth.gain_scale
hc_abs = io.HERACal(ABS_CAL_FILE)
abs_gains, abs_cal_flags, _, _ = hc_abs.read()
assert pol_convention == hc_abs.pol_convention, f'{pol_convention} != {hc_abs.pol_convention}'
assert gain_scale == hc_abs.gain_scale, f'{gain_scale} != {hc_abs.gain_scale}'
# handle the the case where the smooth_cal flags are all True, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
print('This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, '
'but the output data products will still be fully flagged.')
ALL_FLAGGED = True
# Likely the file was fully flagged for broadband RFI, so instead use the original flags from file_calibration
cal_flags = abs_cal_flags
# And if that didn't work, just make the flags all False for now (though ex_ants will still be exluded via reds)
if np.all([flag for flag in cal_flags.values()]):
cal_flags = {ant: np.zeros_like(cal_flags[ant]) for ant in cal_flags}
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):
# Redundantly average data
wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
| cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
sol = redcal.RedSol(reds, gains=gains)
sol.update_vis_from_data(data, wgts=wgts)
# Figure out redundantly averaged flags and nsamples
red_avg_flags = {}
red_avg_nsamples = {}
for red in reds:
if red[0] in sol.vis:
red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
else:
# empty placeholders to make sure every file has the same shape for the whole day
sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
sol.make_sol_finite()
# Build output RedDataContainers
red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
return red_avg_data, red_avg_flags, red_avg_nsamples
# perform redundant averaging
red_avg_smooth_cal_sum_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
red_avg_smooth_cal_diff_data, _, _ = red_average(reds, diff_data, diff_nsamples, smooth_gains, flags=diff_flags, cal_flags=cal_flags)
red_avg_abs_cal_sum_data, _, _ = red_average(reds, data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
red_avg_abs_cal_diff_data, _, _ = red_average(reds, diff_data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
# Save the actual reds used for this redundant average
flagged_bls = [bl for bl in flags if np.all(flags[bl])]
flagged_ants = [ant for ant in cal_flags if np.all(cal_flags[ant])]
reds_used_here = redcal.filter_reds(reds, ex_bls=flagged_bls, ex_ants=flagged_ants)
if SAVE_RESULTS:
with open(REDS_PICKLE_FILE, 'wb') as reds_pickle_file:
pickle.dump(reds_used_here, reds_pickle_file)
if not ALL_FLAGGED:
integration_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags], axis=(0, 2))
def plot_red_avg_vis(pols=['ee', 'nn']):
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 2, figsize=(14, 6), dpi=100, sharex='col', sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
for i, pol in enumerate(pols):
reds_here = redcal.filter_reds(reds, pols=[pol], antpos=hd.antpos, min_bl_cut=1)
sol = redcal.RedSol(reds_here, gains=smooth_gains)
red = sorted(reds_here, key=lambda red: np.median(red_avg_nsamples.get(red[0], 0)), reverse=True)[0]
if np.any([bl[2] == pol for bl in red_avg_flags]):
for tind in range(red_avg_flags[red[0]].shape[0]):
if not np.all(red_avg_flags[red[0]][tind]):
break
calibrated = {bl: np.where(flags[bl] | cal_flags[utils.split_bl(bl)[0]] | cal_flags[utils.split_bl(bl)[1]],
np.nan, sol.calibrate_bl(bl, data[bl])) for bl in red if bl in data}
for bl in red:
axes[0, i].plot(hd.freqs/1e6, np.angle(calibrated[bl][tind]), alpha=.5, lw=.5)
axes[1, i].semilogy(hd.freqs/1e6, np.abs(calibrated[bl][tind]), alpha=.5, lw=.5)
to_plot = np.where(red_avg_flags[red[0]][tind], np.nan, red_avg_smooth_cal_sum_data[red[0]][tind])
axes[0, i].plot(hd.freqs / 1e6, np.angle(to_plot), lw=1, c='k')
axes[1, i].semilogy(hd.freqs / 1e6, np.abs(to_plot), lw=1, c='k', label=f'Baseline Group:\n{red[0]}')
axes[1, i].set_xlabel('Frequency (MHz)')
axes[1, i].legend(loc='upper right')
axes[0, 0].set_ylabel('Visibility Phase (radians)')
axes[1, 0].set_ylabel('Visibility Amplitude (Jy)')
plt.tight_layout()
def plot_red_avg_nsamples():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 1, figsize=(14,7), dpi=100, sharex=True, gridspec_kw={'hspace': 0})
max_nsamples = np.max([np.median(ns) for ns in red_avg_nsamples.values()])
for ax, pol in zip(axes, ['ee', 'nn']):
if np.any([bl[2] == pol for bl in red_avg_nsamples]):
blvecs = np.array([hd.antpos[red[0][1]] - hd.antpos[red[0][0]] for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
med_nsamples = np.array([np.median(red_avg_nsamples[red[0]][~integration_flags, :])
for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=0)
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=(100 * (600 / np.diff(ax.get_xlim())[0])**2), ec='k', linewidths=.5,
c=med_nsamples, norm=matplotlib.colors.LogNorm(vmin=1, vmax=max_nsamples), cmap='turbo')
ax.axis('equal')
ax.set_xlabel('EW Baseline Vector (m)')
ax.set_ylabel('NS Baseline Vector (m)')
ax.text(.98, .94, f'{pol}-polarized', transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
plt.tight_layout()
fig.colorbar(sca, ax=axes, pad=.02, label='Number of Samples')
Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal
¶
The results of calibration and redundant averaging the baseline group with the highest redundancy in each polarization. Visibilities with flagged antennas are excluded. The black line is the redundantly-averaged visibility. Each thin colored line is a different baseline group. Phases are shown in the top row, amplitudes in the bottom, ee-polarized visibilities in the left column, and nn-polarized visibilities in the right.
plot_red_avg_vis()
plot_red_avg_vis(['en', 'ne'])
Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶
The number of visibilities averaged together in each baseline group. Note that the split of the HERA core produces more highly-sampled intra-sector baselines that are interspersed with the less-highly-sampled inter-sector baselines off the main grid.
plot_red_avg_nsamples()
# save memory
ALL_CAL_FLAGS = np.all([np.all(~cal_flags[ant]) for ant in cal_flags])
del hd, data, flags, nsamples, hd_diff, diff_data, diff_flags, diff_nsamples
del hc_smooth, smooth_gains, cal_flags, hc_abs, abs_gains, abs_cal_flags
hd = io.HERADataFastReader(SUM_FILE)
Perform Delay Filter¶
def read_filter_cache_scratch(cache_files):
"""
Load files from a cache specified by cache_dir.
"""
# Load up the cache file with the most keys (precomputed filter matrices).
cache = {}
# loop through cache files, load them.
# If there are new keys, add them to internal cache.
# If not, delete the reference matrices from memory.
for cache_file in cache_files:
try:
cfile = open(cache_file, 'rb')
try:
cache_t = pickle.load(cfile)
finally:
cfile.close()
except:
continue
for key in cache_t:
cache[key] = cache_t[key]
return cache
# Peform delay filter separately above and below FM
low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
to_filter = np.outer(np.ones(len(hd.times), dtype=bool), (hd.freqs <= FM_LOW_FREQ * 1e6) | (hd.freqs > FM_HIGH_FREQ * 1e6))
# set up datacontainers for delay-filtered data products
dly_filt_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
dly_filt_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
dly_filt_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
dly_filt_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)
auto_bls = [bl for bl in dly_filt_red_avg_smooth_cal_sum_data if utils.split_bl(bl)[0] == utils.split_bl(bl)[1]]
gap_flag_warned = set()
new_cache_keys = []
if ALL_FLAGGED and ALL_CAL_FLAGS:
print('All integrations flagged after the file_calibration notebook... skipping delay filtering.')
else:
# Load files in cache
cache = read_filter_cache_scratch(cache_files)
cache_keys = list(cache.keys())
for bl in sorted(red_avg_flags.keys(), key=lambda bl: np.sum(red_avg_flags[bl]), reverse=True):
# inverse variance weight using smooth_calibrated autocorrealtions (which are proprotional to std of noise)
auto_bl1 = [abl for abl in auto_bls if abl[2] == utils.join_pol(utils.split_pol(bl[2])[0], utils.split_pol(bl[2])[0])][0]
auto_bl2 = [abl for abl in auto_bls if abl[2] == utils.join_pol(utils.split_pol(bl[2])[1], utils.split_pol(bl[2])[1])][0]
wgts = np.where(red_avg_flags[bl], 0, np.abs(red_avg_smooth_cal_sum_data[auto_bl1] * red_avg_smooth_cal_sum_data[auto_bl2])**-1)
wgts /= np.nanmean(np.where(red_avg_flags[bl], np.nan, wgts)) # avoid dynamic range issues
wgts[~np.isfinite(wgts)] = 0
# calculate filter properties
bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
filter_centers, filter_half_widths = vis_clean.gen_filter_properties(ax='freq', min_dly=DLY_FILT_MIN_DLY, horizon=DLY_FILT_HORIZON,
standoff=DLY_FILT_STANDOFF, bl_len=bl_len)
for dc in [dly_filt_red_avg_smooth_cal_sum_data, dly_filt_red_avg_smooth_cal_diff_data,
dly_filt_red_avg_abs_cal_sum_data, dly_filt_red_avg_abs_cal_diff_data]:
if np.all(dc[bl] == 0.0) and np.all(red_avg_flags[bl]):
# don't bother delay filtering all 0s.
continue
# run delay filter for each band individually
d_mdl = np.zeros_like(dc[bl])
for band in [low_band, high_band]:
d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dc[bl][:, band], wgts=wgts[:, band],
filter_centers=filter_centers, filter_half_widths=filter_half_widths,
mode='dpss_solve', ridge_alpha=0, fit_intercept=False,
eigenval_cutoff=[DLY_FILT_EIGENVAL_CUTOFF],
suppression_factors=[DLY_FILT_EIGENVAL_CUTOFF],
max_contiguous_edge_flags=len(hd.freqs), cache=cache)
# Track new DPSS-filters entering the cache
fourier_filter_key = dspec._fourier_filter_hash(
filter_centers=filter_centers, filter_half_widths=filter_half_widths, filter_factors=[0.], x=hd.freqs[band], w=None,
crit_name='eigenval_cutoff', label='dpss_operator', crit_val=tuple([DLY_FILT_EIGENVAL_CUTOFF])
)
if fourier_filter_key not in cache_keys:
new_cache_keys.append(fourier_filter_key)
if (bl[0] == bl[1]):
# inpaint autos at delay-filter delay
dc[bl] = np.where(red_avg_flags[bl], d_mdl, dc[bl])
else:
# for all other baselines, filter foregrounds
dc[bl] = np.where(red_avg_flags[bl], 0, dc[bl] - d_mdl)
Mean of empty slice
def plot_delay_spectra():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
# loop over bands
red_avg_flag_waterfall = np.all(list(red_avg_flags.values()), axis=0)
unflagged_inds = np.squeeze(np.argwhere(~np.any(red_avg_flag_waterfall[~np.all(red_avg_flag_waterfall, axis=1), :], axis=0)))
low_band = slice(unflagged_inds[0], np.max(unflagged_inds[hd.freqs[unflagged_inds] < FM_LOW_FREQ * 1e6]) + 1)
high_band = slice(np.min(unflagged_inds[hd.freqs[unflagged_inds] > FM_HIGH_FREQ * 1e6]), unflagged_inds[-1] + 1)
for band in [low_band, high_band]:
display(HTML(f'<h2>Redundantly-Averaged, Smooth-Calibrated Delay Spectra: {hd.freqs[band][0] / 1e6:.2f} — {hd.freqs[band][-1] / 1e6:.2f} MHz</h2>'))
fig, axes = plt.subplots(5, 2, figsize=(14,10), sharex=True, sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
mins, maxes = {}, {}
for bl in red_avg_smooth_cal_sum_data:
# figure out whether to plot this baseline
bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
if not ((np.abs(bl_vec[1]) < 1) and int(np.round(bl_vec[0] / 14.6)) in [0, 1, 2, 4, 8]):
continue
# compute filter size
bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
_, dly_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF,
min_dly=DLY_FILT_MIN_DLY, bl_len=bl_len)
# figure out where to put the plot
row = {0: 0, 1: 1, 2: 2, 4: 3, 8: 4}[int(np.round(bl_vec[0] / 14.6))]
col = int(bl[2] == 'nn')
ax = axes[row, col]
# FFT delay-filtered sums
dfft_df, delays = vis_clean.fft_data(dly_filt_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
ax.semilogy(delays*1e9, np.mean(np.abs(dfft_df[~integration_flags, :]), axis=0), ls='--', label=f'Delay-Filtered Sum Data')
# FFT diffs
dfft_ddf, _ = vis_clean.fft_data(np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_diff_data[bl])[:, band],
delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ddf[~integration_flags, :]), axis=0), ls=':', label=f'Diff Data')
mins[bl] = np.median(np.abs(dfft_ddf[~integration_flags, :])) / 5
try:
maxes[bl] = np.max(np.abs(dfft_ip[~integration_flags, :])) * 2
except:
maxes[bl] = np.max(np.abs(dfft_df[~integration_flags, :])) * 2
ax.axvline(1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5, label=f'Delay Filter Delay')
ax.axvline(-1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5)
if row == col == 0:
ax.legend(loc='upper left')
ax.set_xlim([-2250, 2250])
vec = f'{bl[2]}-polarized\n{bl_vec[0]:.1f} m East\n{bl_vec[1]:.1f} m North'
ax.text(.97, .9, vec, transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
if col == 0:
ax.set_ylabel('$|\\widetilde{V}|$')
ax.set_xlabel('Delay (ns)')
for ax in axes[1:, :].flatten():
ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] != bl[1]]),
np.max([maxes[bl] for bl in mins if bl[0] != bl[1]]))
for ax in axes[0, :]:
ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] == bl[1]]),
np.max([maxes[bl] for bl in mins if bl[0] == bl[1]]))
plt.tight_layout()
plt.show()
Figure 3: Delay-Filtered Visibility Delay Spectra after smooth_cal
and Redundant Averaging¶
This figure plots delay spectra for autocorrelations and 1, 2, 4, and 8 unit East-west baselines. The delay-spectra are computed by FFTing the visibilities with a Blackman-Harris taper and then taking the absolute value. The delay spectra of visibility sums are delay-filtered. The delay spectra of visibility diffs are generated by simply setting their flagged channels to 0. For autocorrelations, only inpainting is performed. These delay spectra are plotted for both polarizations and in both the high (above FM) and low (below FM) bands. Note that the frequency ranges over which the delay-filtering was performed might be slightly larger than bands shown here since flagged channels at band edges were excluded from the FFT.
plot_delay_spectra()
Redundantly-Averaged, Smooth-Calibrated Delay Spectra: 46.92 — 87.33 MHz
Redundantly-Averaged, Smooth-Calibrated Delay Spectra: 108.20 — 233.93 MHz
Save redundantly averaged visbilities¶
if SAVE_RESULTS:
hd_out = io.HERAData(SUM_FILE)
hd_out.read(bls=red_avg_flags.keys())
hd_out.empty_arrays()
hd_out.history += add_to_history
hd_out.update(flags=red_avg_flags, nsamples=red_avg_nsamples)
hd_out.pol_convention = pol_convention
hd_out.vis_units = gain_scale
if ALL_FLAGGED:
# put back in all the flags that we had been ignoring up to this point
hd_out.flag_array = np.ones_like(hd_out.flag_array)
def _write(dc, outfile, *args):
if not np.all(args):
return
hd_out.update(data=dc)
print(f'Now writing redundantly-averaged calibrated visibilities to {outfile}')
hd_out.write_uvh5(outfile, clobber=True, fix_autos=True)
# write delay-filtered file(s)
_write(red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_FILE)
_write(red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_FILE, SAVE_DIFF_RED_AVG)
_write(red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_FILE, SAVE_ABS_CAL_RED_AVG)
_write(red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_FILE, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
_write(dly_filt_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG)
_write(dly_filt_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_DIFF_RED_AVG)
_write(dly_filt_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
_write(dly_filt_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
Now writing redundantly-averaged calibrated visibilities to /mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.red_avg.uvh5
Save incoherent average magnitude waterfalls¶
for mode in ['autos', 'crosses', 'all visibiltiies']:
avg_abs_data, avg_flags, avg_nsamples = {}, {}, {}
for pol in ['ee', 'nn']:
bls_to_avg = []
if mode == 'autos' or mode == 'all visibiltiies':
key = (hd.data_ants[0], hd.data_ants[0], pol)
bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] == bl[1] and bl[2] == pol]
if mode == 'crosses' or mode == 'all visibiltiies':
key = (hd.data_ants[0], hd.data_ants[1], pol)
bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] != bl[1] and bl[2] == pol]
if len(bls_to_avg) > 0:
weights = np.array([red_avg_nsamples[bl] * (~red_avg_flags[bl]) for bl in bls_to_avg])
avg_flags[key] = np.all(weights == 0, axis=0)
weights = np.array([np.where(avg_flags[key], red_avg_nsamples[bl],
red_avg_nsamples[bl] * (~red_avg_flags[bl])) for bl in bls_to_avg])
avg_nsamples[key] = np.sum([red_avg_nsamples[bl] for bl in bls_to_avg], axis=0)
avg_abs_data[key] = np.average([np.abs(red_avg_smooth_cal_sum_data[bl]) for bl in bls_to_avg], weights=weights, axis=0)
if SAVE_RESULTS:
hd_out = io.HERAData(SUM_FILE)
hd_out.read(bls=set([bl[0:2] for bl in avg_abs_data]), polarizations=['ee', 'nn'])
hd_out.empty_arrays()
hd_out.update(data=avg_abs_data, flags=avg_flags, nsamples=avg_nsamples)
hd_out.pol_convention = pol_convention
hd_out.vis_units = gain_scale
if ALL_FLAGGED:
# put back in all the flags that we had been ignoring up to this point
hd_out.flag_array = np.ones_like(hd_out.flag_array)
hd_out.history += add_to_history
outfile = {'autos': AVG_ABS_AUTO_FILE, 'crosses': AVG_ABS_CROSS_FILE, 'all visibiltiies': AVG_ABS_ALL_FILE}[mode]
print(f'Writing averaged absolute of {mode} to {outfile}')
hd_out.write_uvh5(outfile, clobber=True, fix_autos=True)
Writing averaged absolute of autos to /mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_auto.uvh5
Writing averaged absolute of crosses to /mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_cross.uvh5
Writing averaged absolute of all visibiltiies to /mnt/sn1/data2/2460626/zen.2460626.45933.sum.smooth_calibrated.avg_abs_all.uvh5
Save filter cache¶
if SAVE_RESULTS:
# Create cache directory if it doesn't already exist
if not os.path.exists(filter_cache_dir):
os.mkdir(filter_cache_dir)
# For each new cache key save a new cache file in the cache directory
for key in new_cache_keys:
cache_file_basename = key[0] + '.filter_cache.p'
if not os.path.exists(os.path.join(filter_cache_dir, cache_file_basename)):
try:
cfile = open(os.path.join(filter_cache_dir, cache_file_basename), 'wb')
try:
pickle.dump({key: cache[key]}, cfile)
finally:
cfile.close()
except (OSError, IOError):
# File could not be written to, move on
pass
Metadata¶
for repo in ['pyuvdata', 'hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates']:
exec(f'from {repo} import __version__')
print(f'{repo}: {__version__}')
pyuvdata: 3.0.1.dev70+g283dda3 hera_cal: 3.6.2.dev110+g0529798 hera_qm: 2.2.0 hera_filters: 0.1.6.dev1+g297dcce
hera_notebook_templates: 0.1.dev936+gdc93cad
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 12.81 minutes.