Single File Delay Filtered Average Z-Score¶
by Josh Dillon, last updated May 12, 2024
This notebook is designed to calculate a metric used for finding low-level RFI in redundantly-averaged cross-correlations, which are then incoherently averaged across well-sampled baselines.
The actual decision of which times to flag is deferred to another notebook, full_day_rfi_round_2.ipynb
Here's a set of links to skip to particular figures and tables:
• Figure 1: z-Score Spectra for All Integrations in the File¶
• Figure 2: Histogram of z-Scores¶
import time
tstart = time.time()
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin # REQUIRED to have the compression plugins available
import numpy as np
import copy
import glob
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from pyuvdata import UVFlag, UVData
from scipy import constants
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.59008.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
# get input calibration files and flags
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
SMOOTH_CAL_FILE = SUM_FILE.replace(SUM_SUFFIX, SMOOTH_CAL_SUFFIX)
# get output file suffix
ZSCORE_SUFFIX = os.environ.get("ZSCORE_SUFFIX", 'sum.red_avg_zscore.h5')
ZSCORE_OUTFILE = SUM_FILE.replace(SUM_SUFFIX, ZSCORE_SUFFIX)
# get delay filtering parameters
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
MIN_SAMP_FRAC = float(os.environ.get("MIN_SAMP_FRAC", .05))
FILTER_DELAY = float(os.environ.get("FILTER_DELAY", 500)) # in ns
EIGENVAL_CUTOFF = float(os.environ.get("EIGENVAL_CUTOFF", 1e-12))
for setting in ['SUM_FILE', 'SMOOTH_CAL_FILE', 'ZSCORE_OUTFILE', 'FM_LOW_FREQ', 'FM_HIGH_FREQ',
'MIN_SAMP_FRAC', 'FILTER_DELAY', 'EIGENVAL_CUTOFF',]:
if isinstance(eval(setting), str):
print(f'{setting} = "{eval(setting)}"')
else:
print(f'{setting} = {eval(setting)}')
SUM_FILE = "/mnt/sn1/data1/2460455/zen.2460455.34458.sum.uvh5" SMOOTH_CAL_FILE = "/mnt/sn1/data1/2460455/zen.2460455.34458.sum.smooth.calfits" ZSCORE_OUTFILE = "/mnt/sn1/data1/2460455/zen.2460455.34458.sum.red_avg_zscore.h5" FM_LOW_FREQ = 87.5 FM_HIGH_FREQ = 108.0 MIN_SAMP_FRAC = 0.05 FILTER_DELAY = 500.0 EIGENVAL_CUTOFF = 1e-12
Load data, calibrate, and redundantly average¶
# Load calibration solutions and gain flags
t = time.time()
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
print(f'Finished loading smoothed calibration file in {(time.time() - t) / 60:.2f} minutes.')
Finished loading smoothed calibration file in 0.02 minutes.
if 'full_day_rfi_round_2' in hc_smooth.history:
raise ValueError('It looks like the pipeline is trying to be re-run midway through. '
'It is strongly recommended to go back and re-run smooth_cal first to avoid state-dependent results.')
# handle the the case where the visibility flags are all True for at least one pol, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
print('This file is entirely flagged.')
ALL_FLAGGED = True
else:
for pol in ('Jee', 'Jnn'):
if len([ant for ant, flag in cal_flags.items() if ant[1] == pol and not np.all(flag)]) <= 1:
print(f'Effectively all {pol}-polarized antennas are flagged, so flagging the entire file.')
ALL_FLAGGED = True
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):
# Redundantly average data
wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
| cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
sol = redcal.RedSol(reds, gains=gains)
sol.update_vis_from_data(data, wgts=wgts)
# Figure out redundantly averaged flags and nsamples
red_avg_flags = {}
red_avg_nsamples = {}
for red in reds:
if red[0] in sol.vis:
red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
else:
# empty placeholders to make sure every file has the same shape for the whole day
sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
sol.make_sol_finite()
# Build output RedDataContainers
red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
return red_avg_data, red_avg_flags, red_avg_nsamples
if not ALL_FLAGGED:
# Load sum and diff data
t = time.time()
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=['ee', 'nn'])
print(f'Finished reading data in {(time.time() - t) / 60:.2f} minutes.')
# figure out high and low bands
low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
# redundantly average
t = time.time()
reds = redcal.get_reds(hd.data_antpos, pols=['ee', 'nn'], include_autos=True)
red_avg_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
print(f'Finished redundantly averaging data in {(time.time() - t) / 60:.2f} minutes.')
del data, nsamples, flags
Finished reading data in 1.15 minutes.
Finished redundantly averaging data in 0.37 minutes.
Delay filter redundantly-averaged data¶
if not ALL_FLAGGED:
t = time.time()
# compute weights based on RFI flags and autocorrelations, assumes that nsamples is constant across a spectrum
wgts = {}
for pol in ['ee', 'nn']:
rfi_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags if bl[2] == pol], axis=0)
auto_bl = [bl for bl in red_avg_data if bl[0] == bl[1] and bl[2] == pol][0]
wgts[pol] = np.where(rfi_flags, 0, np.abs(red_avg_data[auto_bl])**-2)
wgts[pol] /= np.nanmean(np.where(rfi_flags, np.nan, wgts[pol])) # avoid dynamic range issues
wgts[pol][~np.isfinite(wgts[pol])] = 0
# pick out baselines with enough median nsamples and light-travel times shorter than the filter delay
min_nsamples = np.max([np.max(red_avg_nsamples[bl]) for bl in red_avg_nsamples]) * MIN_SAMP_FRAC
bls_to_filter = [bl for bl in red_avg_data if (np.median(red_avg_nsamples[bl]) >= min_nsamples)]
bls_to_filter = [bl for bl in bls_to_filter if np.linalg.norm(hd.antpos[bl[0]] - hd.antpos[bl[1]]) / constants.c * 1e9 < FILTER_DELAY]
# perform delay filter
cache = {}
dly_filt_red_avg_data = copy.deepcopy(red_avg_data)
for bl in bls_to_filter:
d_mdl = np.zeros_like(dly_filt_red_avg_data[bl])
for band in [low_band, high_band]:
d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dly_filt_red_avg_data[bl][:, band],
wgts=wgts[bl[2]][:, band], filter_centers=[0],
filter_half_widths=[FILTER_DELAY / 1e9], mode='dpss_solve',
eigenval_cutoff=[EIGENVAL_CUTOFF], suppression_factors=[EIGENVAL_CUTOFF],
max_contiguous_edge_flags=len(hd.freqs), cache=cache)
dly_filt_red_avg_data[bl] = np.where(red_avg_flags[bl], 0, red_avg_data[bl] - d_mdl)
print(f'Finished delay-filtering in {(time.time() - t) / 60:.2f} minutes.')
Finished delay-filtering in 0.13 minutes.
Calculate z-scores¶
if not ALL_FLAGGED:
t = time.time()
# estimate how much signal loss we should expect for white noise
filters_low = dspec.dpss_operator(hd.freqs[low_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
filter_frac_low = filters_low.shape[1] / filters_low.shape[0]
filters_high = dspec.dpss_operator(hd.freqs[high_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
filter_frac_high = filters_high.shape[1] / filters_high.shape[0]
# calculate z-scores
zscore = {}
for pol in ['ee', 'nn']:
to_avg = []
weights = []
for bl in bls_to_filter:
auto_bl = auto_bl = [k for k in red_avg_data if k[0] == k[1] and k[2] == bl[2]][0]
if (bl[2] == pol) and (bl != auto_bl):
# calcualte predicted variance
dt = np.median(np.diff(hd.times)) * 24 * 3600
df = np.median(np.diff(hd.freqs))
predicted_variance = (np.abs(red_avg_data[auto_bl]))**2 / red_avg_nsamples[bl] / dt / df
predicted_variance[:, low_band] *= (1 - filter_frac_low)
predicted_variance[:, high_band] *= (1 - filter_frac_high)
predicted_variance[red_avg_flags[bl]] = np.nan
# prep for inverse variance weighting
if np.any(np.isfinite(predicted_variance)):
weights.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]), predicted_variance**-1, 0))
to_avg.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]), np.abs(dly_filt_red_avg_data[bl]), 0))
# Check if one polarization is effectively entirely flagged, and if so, flag the whole file
if len(weights) == 0:
print(f'No unique baselines have enough {pol}-polarized samples to merit filtering. Flagging the entire file.')
ALL_FLAGGED = True
continue
# perform inverse variance weighred average
Wsum = np.sum(weights, axis=0)**-1
estimator = np.einsum("mij,mij->ij", to_avg, weights) * Wsum
# turn estimator into z-score assuming Rayleigh-distributed data (appropriate for averaging magnitudes of visibilities incoherently)
predicted_mean = np.sum(np.array(weights)**.5, axis=0) * Wsum * (np.pi / 4)**.5
predicted_var = (4 - np.pi) / 4 * Wsum
zscore[pol] = (estimator - predicted_mean) / predicted_var**.5
print(f'Finished computing z-scores in {(time.time() - t) / 60:.2f} minutes.')
divide by zero encountered in reciprocal invalid value encountered in multiply invalid value encountered in multiply
Finished computing z-scores in 0.04 minutes.
Plotting Code¶
def plot_zscores():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 1, sharey=True, sharex=True, figsize=(12, 6), gridspec_kw={'hspace': 0})
for ax, pol in zip(axes, ['ee', 'nn']):
for i, time in enumerate(hd.times):
ax.plot(hd.freqs / 1e6, zscore[pol][i, :], label=f'JD: {hd.times[i]:.6f}', alpha=.75)
ax.set_ylabel(f'{pol}-polarized z-score')
axes[0].legend()
axes[1].set_xlabel('Frequency (MHz)')
plt.tight_layout()
def plot_zscore_hist():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
plt.figure(figsize=(12, 4))
bins = np.arange(-np.nanmax(np.abs(list(zscore.values()))) - 1, np.nanmax(np.abs(list(zscore.values()))) + 1, .1)
hist_ee = plt.hist(np.ravel(zscore['ee']), bins=bins, density=True, label='ee-polarized z-scores', alpha=.5)
hist_nn = plt.hist(np.ravel(zscore['nn']), bins=bins, density=True, label='nn-polarized z-scores', alpha=.5)
plt.plot(bins, (2*np.pi)**-.5 * np.exp(-bins**2 / 2), 'k--', label='Gaussian approximate\nnoise-only distribution')
plt.yscale('log')
all_densities = np.concatenate([hist_ee[0][hist_ee[0] > 0], hist_nn[0][hist_nn[0] > 0]])
plt.ylim(np.min(all_densities) / 2, np.max(all_densities) * 2)
plt.legend()
plt.xlabel('z-score')
plt.ylabel('Density')
plt.tight_layout()
Figure 1: z-Score Spectra for All Integrations in the File¶
This plot shows the z-score spectrum for each integration and for both polarizations. This is what we'll use in full_day_rfi_round_2.ipynb to further refine the flagging waterfall. Negative-going excursions near prior flag boundaries are expected: the filter can overfit the noise when it is unconstrained on one side.
plot_zscores()
Figure 2: Histogram of z-Scores¶
Shows a comparison of the histogram of z-scores in this file (one per polarization) to a Gaussian approximation of what one might expect from thermal noise. Without filtering, the actual distribution is a weighted sum of Rayleigh distributions. Filtering further complicates this, and we approximate the signal loss as a simple fraction of modes filtered, which would be appropriate for white noise.
plot_zscore_hist()
Save results¶
# save results as a UVFlag file of waterfall type and metric mode
t = time.time()
uvd = UVData()
uvd.read(SUM_FILE, read_data=False)
uvf = UVFlag(uvd, waterfall=True, mode='metric')
uvf.select(polarizations=['ee', 'nn'])
uvf.history += '\nProduced by delay_filtered_average_zscore notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
if ALL_FLAGGED:
uvf.metric_array[:, :, :] = np.nan
else:
for pol in ['ee', 'nn']:
uvf.metric_array[:, :, np.argwhere(uvf.polarization_array == utils.polstr2num(pol, x_orientation=uvf.x_orientation))[0][0]] = zscore[pol]
uvf.write(ZSCORE_OUTFILE, clobber=True)
print(f'Finished writing z-scores in {(time.time() - t) / 60:.2f} minutes.')
Finished writing z-scores in 0.23 minutes.
Metadata¶
for repo in ['hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates', 'pyuvdata']:
exec(f'from {repo} import __version__')
print(f'{repo}: {__version__}')
hera_cal: 3.6.dev110+gcc0a13d hera_qm: 2.1.5.dev3+gf3b4a97 hera_filters: 0.1.5
hera_notebook_templates: 0.1.dev734+g90f16f4 pyuvdata: 2.4.2
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 2.46 minutes.