Single File Delay Filtered Average Z-Score¶

by Josh Dillon, last updated May 12, 2024

This notebook is designed to calculate a metric used for finding low-level RFI in redundantly-averaged cross-correlations, which are then incoherently averaged across well-sampled baselines.

The actual decision of which times to flag is deferred to another notebook, full_day_rfi_round_2.ipynb

Here's a set of links to skip to particular figures and tables:

• Figure 1: z-Score Spectra for All Integrations in the File¶

• Figure 2: Histogram of z-Scores¶

In [1]:
import time
tstart = time.time()
In [2]:
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin  # REQUIRED to have the compression plugins available
import numpy as np
import copy
import glob
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from pyuvdata import UVFlag, UVData
from scipy import constants
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline
In [3]:
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.59008.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')

# get input calibration files and flags
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
SMOOTH_CAL_FILE = SUM_FILE.replace(SUM_SUFFIX, SMOOTH_CAL_SUFFIX)

# get output file suffix
ZSCORE_SUFFIX =  os.environ.get("ZSCORE_SUFFIX", 'sum.red_avg_zscore.h5')
ZSCORE_OUTFILE =  SUM_FILE.replace(SUM_SUFFIX, ZSCORE_SUFFIX)

# get delay filtering parameters
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
MIN_SAMP_FRAC = float(os.environ.get("MIN_SAMP_FRAC", .05))
FILTER_DELAY = float(os.environ.get("FILTER_DELAY", 500)) # in ns
EIGENVAL_CUTOFF = float(os.environ.get("EIGENVAL_CUTOFF", 1e-12))

for setting in ['SUM_FILE', 'SMOOTH_CAL_FILE', 'ZSCORE_OUTFILE', 'FM_LOW_FREQ', 'FM_HIGH_FREQ', 
                'MIN_SAMP_FRAC', 'FILTER_DELAY', 'EIGENVAL_CUTOFF',]:
    if isinstance(eval(setting), str):
        print(f'{setting} = "{eval(setting)}"')
    else:
        print(f'{setting} = {eval(setting)}')
SUM_FILE = "/mnt/sn1/data2/2460446/zen.2460446.34461.sum.uvh5"
SMOOTH_CAL_FILE = "/mnt/sn1/data2/2460446/zen.2460446.34461.sum.smooth.calfits"
ZSCORE_OUTFILE = "/mnt/sn1/data2/2460446/zen.2460446.34461.sum.red_avg_zscore.h5"
FM_LOW_FREQ = 87.5
FM_HIGH_FREQ = 108.0
MIN_SAMP_FRAC = 0.05
FILTER_DELAY = 500.0
EIGENVAL_CUTOFF = 1e-12

Load data, calibrate, and redundantly average¶

In [4]:
# Load calibration solutions and gain flags
t = time.time()
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
print(f'Finished loading smoothed calibration file in {(time.time() - t) / 60:.2f} minutes.')
Finished loading smoothed calibration file in 0.01 minutes.
In [5]:
if 'full_day_rfi_round_2' in hc_smooth.history:
    raise ValueError('It looks like the pipeline is trying to be re-run midway through. '
                     'It is strongly recommended to go back and re-run smooth_cal first to avoid state-dependent results.')
In [6]:
# handle the the case where the visibility flags are all True for at least one pol, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
    print('This file is entirely flagged.')
    ALL_FLAGGED = True
else:
    for pol in ('Jee', 'Jnn'):
        if len([ant for ant, flag in cal_flags.items() if ant[1] == pol and not np.all(flag)]) <= 1:
            print(f'Effectively all {pol}-polarized antennas are flagged, so flagging the entire file.')
            ALL_FLAGGED = True
In [7]:
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):    
    # Redundantly average data
    wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
                                                             | cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
    sol = redcal.RedSol(reds, gains=gains)
    sol.update_vis_from_data(data, wgts=wgts)
    
    # Figure out redundantly averaged flags and nsamples
    red_avg_flags = {}
    red_avg_nsamples = {}
    for red in reds:
        if red[0] in sol.vis:
            red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
            red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
        else:
            # empty placeholders to make sure every file has the same shape for the whole day
            sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
            red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
            red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
    sol.make_sol_finite()
    
    # Build output RedDataContainers 
    red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
    red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
    red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
    return red_avg_data, red_avg_flags, red_avg_nsamples
In [8]:
if not ALL_FLAGGED:
    # Load sum and diff data
    t = time.time()
    hd = io.HERADataFastReader(SUM_FILE)
    data, flags, nsamples = hd.read(pols=['ee', 'nn'])
    print(f'Finished reading data in {(time.time() - t) / 60:.2f} minutes.')
    
    # figure out high and low bands
    low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
    high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
    
    # redundantly average
    t = time.time()
    reds = redcal.get_reds(hd.data_antpos, pols=['ee', 'nn'], include_autos=True)
    red_avg_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
    print(f'Finished redundantly averaging data in {(time.time() - t) / 60:.2f} minutes.')

    del data, nsamples, flags
Finished reading data in 0.96 minutes.
Finished redundantly averaging data in 0.21 minutes.

Delay filter redundantly-averaged data¶

In [9]:
if not ALL_FLAGGED:
    t = time.time()
    # compute weights based on RFI flags and autocorrelations, assumes that nsamples is constant across a spectrum
    wgts = {}
    for pol in ['ee', 'nn']:
        rfi_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags if bl[2] == pol], axis=0)
        auto_bl = [bl for bl in red_avg_data if bl[0] == bl[1] and bl[2] == pol][0]
        wgts[pol] = np.where(rfi_flags, 0, np.abs(red_avg_data[auto_bl])**-2)
        wgts[pol] /= np.nanmean(np.where(rfi_flags, np.nan, wgts[pol]))  # avoid dynamic range issues
        wgts[pol][~np.isfinite(wgts[pol])] = 0
        
    # pick out baselines with enough median nsamples and light-travel times shorter than the filter delay
    min_nsamples = np.max([np.max(red_avg_nsamples[bl]) for bl in red_avg_nsamples]) * MIN_SAMP_FRAC
    bls_to_filter = [bl for bl in red_avg_data if (np.median(red_avg_nsamples[bl]) >= min_nsamples)]
    bls_to_filter = [bl for bl in bls_to_filter if np.linalg.norm(hd.antpos[bl[0]] - hd.antpos[bl[1]]) / constants.c * 1e9 < FILTER_DELAY]

    # perform delay filter
    cache = {}
    dly_filt_red_avg_data = copy.deepcopy(red_avg_data)
    for bl in bls_to_filter:
        d_mdl = np.zeros_like(dly_filt_red_avg_data[bl])
        for band in [low_band, high_band]:
            d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dly_filt_red_avg_data[bl][:, band], 
                                                           wgts=wgts[bl[2]][:, band], filter_centers=[0], 
                                                           filter_half_widths=[FILTER_DELAY / 1e9], mode='dpss_solve', 
                                                           eigenval_cutoff=[EIGENVAL_CUTOFF], suppression_factors=[EIGENVAL_CUTOFF], 
                                                           max_contiguous_edge_flags=len(hd.freqs), cache=cache)
        dly_filt_red_avg_data[bl] = np.where(red_avg_flags[bl], 0, red_avg_data[bl] - d_mdl)
    print(f'Finished delay-filtering in {(time.time() - t) / 60:.2f} minutes.')
Finished delay-filtering in 0.10 minutes.

Calculate z-scores¶

In [10]:
if not ALL_FLAGGED:
    t = time.time()
    # estimate how much signal loss we should expect for white noise
    filters_low = dspec.dpss_operator(hd.freqs[low_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
    filter_frac_low = filters_low.shape[1] / filters_low.shape[0]
    filters_high = dspec.dpss_operator(hd.freqs[high_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
    filter_frac_high = filters_high.shape[1] / filters_high.shape[0]
    
    # calculate z-scores
    zscore = {}
    for pol in ['ee', 'nn']:
        to_avg = []
        weights = []

        for bl in bls_to_filter:
            auto_bl = auto_bl = [k for k in red_avg_data if k[0] == k[1] and k[2] == bl[2]][0]
            if (bl[2] == pol) and (bl != auto_bl):            
                # calcualte predicted variance
                dt = np.median(np.diff(hd.times)) * 24 * 3600
                df = np.median(np.diff(hd.freqs)) 
                predicted_variance = (np.abs(red_avg_data[auto_bl]))**2 / red_avg_nsamples[bl] / dt / df 
                predicted_variance[:, low_band] *= (1 - filter_frac_low)
                predicted_variance[:, high_band] *= (1 - filter_frac_high)
                predicted_variance[red_avg_flags[bl]] = np.nan

                # prep for inverse variance weighting
                if np.any(np.isfinite(predicted_variance)):
                    weights.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]), predicted_variance**-1, 0))
                    to_avg.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]),  np.abs(dly_filt_red_avg_data[bl]), 0))

        # Check if one polarization is effectively entirely flagged, and if so, flag the whole file
        if len(weights) == 0:
            print(f'No unique baselines have enough {pol}-polarized samples to merit filtering. Flagging the entire file.')
            ALL_FLAGGED = True
            continue
        
        # perform inverse variance weighred average
        Wsum = np.sum(weights, axis=0)**-1
        estimator = np.einsum("mij,mij->ij", to_avg, weights) * Wsum

        # turn estimator into z-score assuming Rayleigh-distributed data (appropriate for averaging magnitudes of visibilities incoherently)
        predicted_mean = np.sum(np.array(weights)**.5, axis=0) * Wsum * (np.pi / 4)**.5
        predicted_var = (4 - np.pi) / 4 * Wsum
        zscore[pol] = (estimator - predicted_mean) / predicted_var**.5
    print(f'Finished computing z-scores in {(time.time() - t) / 60:.2f} minutes.')
divide by zero encountered in reciprocal
invalid value encountered in multiply
invalid value encountered in multiply
Finished computing z-scores in 0.03 minutes.

Plotting Code¶

In [11]:
def plot_zscores():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return    
    
    fig, axes = plt.subplots(2, 1, sharey=True, sharex=True, figsize=(12, 6), gridspec_kw={'hspace': 0})
    for ax, pol in zip(axes, ['ee', 'nn']):

        for i, time in enumerate(hd.times):
            ax.plot(hd.freqs / 1e6, zscore[pol][i, :], label=f'JD: {hd.times[i]:.6f}', alpha=.75)
        
        ax.set_ylabel(f'{pol}-polarized z-score')
    axes[0].legend()        
    axes[1].set_xlabel('Frequency (MHz)')
    plt.tight_layout()    
In [12]:
def plot_zscore_hist():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return    
    
    plt.figure(figsize=(12, 4))
    bins = np.arange(-np.nanmax(np.abs(list(zscore.values()))) - 1, np.nanmax(np.abs(list(zscore.values()))) + 1, .1)
    hist_ee = plt.hist(np.ravel(zscore['ee']), bins=bins, density=True, label='ee-polarized z-scores', alpha=.5)
    hist_nn = plt.hist(np.ravel(zscore['nn']), bins=bins, density=True, label='nn-polarized z-scores', alpha=.5)
    plt.plot(bins, (2*np.pi)**-.5 * np.exp(-bins**2 / 2), 'k--', label='Gaussian approximate\nnoise-only distribution')
    plt.yscale('log')
    all_densities = np.concatenate([hist_ee[0][hist_ee[0] > 0], hist_nn[0][hist_nn[0] > 0]]) 
    plt.ylim(np.min(all_densities) / 2, np.max(all_densities) * 2)
    plt.legend()
    plt.xlabel('z-score')
    plt.ylabel('Density')
    plt.tight_layout()

Figure 1: z-Score Spectra for All Integrations in the File¶

This plot shows the z-score spectrum for each integration and for both polarizations. This is what we'll use in full_day_rfi_round_2.ipynb to further refine the flagging waterfall. Negative-going excursions near prior flag boundaries are expected: the filter can overfit the noise when it is unconstrained on one side.

In [13]:
plot_zscores()
No description has been provided for this image

Figure 2: Histogram of z-Scores¶

Shows a comparison of the histogram of z-scores in this file (one per polarization) to a Gaussian approximation of what one might expect from thermal noise. Without filtering, the actual distribution is a weighted sum of Rayleigh distributions. Filtering further complicates this, and we approximate the signal loss as a simple fraction of modes filtered, which would be appropriate for white noise.

In [14]:
plot_zscore_hist()
No description has been provided for this image

Save results¶

In [15]:
# save results as a UVFlag file of waterfall type and metric mode
t = time.time()
uvd = UVData()
uvd.read(SUM_FILE, read_data=False)
uvf = UVFlag(uvd, waterfall=True, mode='metric')
uvf.select(polarizations=['ee', 'nn'])
uvf.history += '\nProduced by delay_filtered_average_zscore notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
if ALL_FLAGGED:
    uvf.metric_array[:, :, :] = np.nan
else:
    for pol in ['ee', 'nn']:
        uvf.metric_array[:, :, np.argwhere(uvf.polarization_array == utils.polstr2num(pol, x_orientation=uvf.x_orientation))[0][0]] = zscore[pol]
uvf.write(ZSCORE_OUTFILE, clobber=True)
print(f'Finished writing z-scores in {(time.time() - t) / 60:.2f} minutes.')
Finished writing z-scores in 0.56 minutes.

Metadata¶

In [16]:
for repo in ['hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates', 'pyuvdata']:
    exec(f'from {repo} import __version__')
    print(f'{repo}: {__version__}')
hera_cal: 3.6.dev65+ge56a686
hera_qm: 2.1.4
hera_filters: 0.1.5
hera_notebook_templates: 0.1.dev734+g90f16f4
pyuvdata: 2.4.2
In [17]:
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 2.04 minutes.