"""This module contain methods for QE-related analytical predictions on data or filtering with inhomogeneous noise
"""
import healpy as hp
import numpy as np
from plancklens import utils, nhl, qresp
from plancklens.helpers import cachers
def _read_map(m):
if isinstance(m, str):
return hp.read_map(m)
return m
[docs]def get_patchy_N0s(qekey_in, npatches, pixivmap_t, pixivmap_p, cls_unl, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf,
rvmap_uKamin_t_data=None, rvmap_uKamin_p_data=None, joint_TP=False,
nlevt_fid=None, nlevp_fid=None, cacher=cachers.cacher_mem(), source='p'):
"""Collects the effective reconstruction noise levels for different filtering and spectrum weighting schemes
Args:
qekey_in: QE anisotroy key
npatches: the variance map will be split into this number of regions of equal sky areas
pixivmap_t: inverse temperature noise pixel variance map
pixivmap_p: inverse polarization noise pixel variance map
cls_unl: unlensed CMB dict
cls_cmb_dat: CMB spectra dict entering the data maps
cls_cmb_filt: CMB spectra dict entering the filtering steps
cls_weight: CMB spectra dict entering the QE weights (numerators)
lmin_ivf: minimal CMB mutlipole
lmax_ivf: maximal CMB multipole
lmax_qlm: maximal QE multipole
transf: CMB transfer function cl
rvmap_uKamin_t_data (optional): set this to the data temperature noise map (in uK amin), if different from the one defining the filtering
rvmap_uKamin_p_data (optional): set this to the data polarisation noise map (in uK amin), if different from the one defining the filtering
joint_TP: set this to true if temperature and polarization are jointly filtered before building the QE
nlevt_fid: set this to the fiducial temperature noise value to use for the single full-sky normalization
nlevp_fid: set this to the fiducial polarisation noise value to use for the single full-sky normalization
cacher: can use this to store results (descriptors only use the noise levels, joint_TP and qe_keys though)
source: anistropy source for the responses calculations
Returns:
N0s: a dict of N0 arrays for different filtering and spectr weighting types
MCcorr: prediction of the Monte-Carlo correction of the spectrum for inhom. filtering
cMCcorr: Same for the cross-spectrum to the true lensing
"""
assert qekey_in[0] in ['p', 'x'], 'fix curl fiducial and MC correction'
qe_key = 'p' + qekey_in[1:]
nlevst_ftl, nlevst_data, _nlevt_fid, fskiest, masks = mk_patches(npatches, pixivmap_t, rvmap_uKamin_data=rvmap_uKamin_t_data)
nlevsp_ftl, nlevsp_data, _nlevp_fid, fskiesp, masks = mk_patches(npatches, pixivmap_p, rvmap_uKamin_data=rvmap_uKamin_p_data)
if nlevt_fid is None: nlevt_fid = _nlevt_fid
if nlevp_fid is None: nlevp_fid = _nlevp_fid
assert np.allclose(fskiest, fskiesp, atol=1e-6), (np.array(fskiest)-np.array(fskiesp), fskiesp)
fskies = fskiest
cpp = cls_unl['pp'][:lmax_qlm+1]
rid = 0 if qekey_in[0] == 'p' else 1
if qekey_in[0] == 'x':
cpp *= 0.
rfid = get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, [nlevt_fid], [nlevp_fid],
joint_TP=joint_TP, cacher=cacher, source=source)[0]
resps = get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, nlevst_ftl, nlevsp_ftl,
joint_TP=joint_TP, cacher=cacher, source=source)
nhls_pds = get_nhls(qe_key, qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, nlevst_ftl, nlevst_data, nlevsp_ftl, nlevsp_data,
joint_TP=joint_TP,cacher=cacher)
nhls_fds = get_nhls(qe_key, qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, [nlevt_fid] * npatches, nlevst_data, [nlevp_fid] * npatches, nlevsp_data,
joint_TP=joint_TP,cacher=cacher)
labels = ['hom-filt, no-rew', 'hom-filt, mv-rew', 'inhom-filt, no-rew', 'inhom-filt, mv-rew']
N0s = {q: np.zeros(lmax_qlm + 1, dtype=float) for q in labels}
MCcorr_vmap = np.zeros(lmax_qlm + 1, dtype=float)
cMCcorr_vmap = np.zeros(lmax_qlm + 1, dtype=float)
fsky_tot = np.sum(fskies)
rfidi = utils.cli(rfid[rid])
for i, (fsky, resp, nhl_pd, nhl_fd) in enumerate(zip(fskies, resps, nhls_pds, nhls_fds)):
fp_f = fsky / fsky_tot
Rp_Rf = resp[rid] * rfidi
N0s['hom-filt, no-rew'] += fp_f * (cpp + nhl_fd[rid] * rfidi ** 2) ** 2
# : spectrum of homo. filtered map without any spectra reweighting
N0s['inhom-filt, no-rew'] += fp_f * (Rp_Rf ** 2 * cpp + nhl_pd[rid] * rfidi ** 2) ** 2
# : spectrum of inhomo. filtered map without any spectra reweighting
N0s['hom-filt, mv-rew'] += fp_f * utils.cli((cpp + nhl_fd[rid] * rfidi ** 2) ** 2)
# : inverse variance weighting of homog filtered map
N0s['inhom-filt, mv-rew'] += fp_f * utils.cli((cpp + nhl_pd[rid] * rfidi ** 2 * utils.cli(Rp_Rf ** 2)) ** 2)
# : inverse variance weighting of inhomog filtered map
MCcorr_vmap += fp_f * Rp_Rf ** 2
cMCcorr_vmap += fp_f * Rp_Rf
N0s['hom-filt, mv-rew'] = utils.cli(N0s['hom-filt, mv-rew'])
N0s['inhom-filt, mv-rew'] = utils.cli(N0s['inhom-filt, mv-rew'])
N0s['inhom-filt, no-rew'] *= utils.cli(MCcorr_vmap ** 2)
for spec in N0s.values():
spec[:] = np.sqrt(spec) - cpp
return N0s, MCcorr_vmap, cMCcorr_vmap
[docs]def mk_patches(Np, pix_ivmap, rvmap_uKamin_data=None, ret_masks=False):
"""Splits the variance maps into equal-area regions with different noise levels
Args:
Np: desired number of patches
pix_ivmap: input inverse pixel variance map used for the filtering
rvmap_uKamin_data: root variance map in uK amin of the data (if different from pix_ivmap)
ret_masks: returns the defined series of masks if set
"""
mask = _read_map(pix_ivmap) > 0
npix = mask.size
nside = hp.npix2nside(npix)
vmap = utils.cli(np.sqrt(_read_map(pix_ivmap))) * np.sqrt(hp.nside2pixarea(nside)) / np.pi * 60 * 180.
edges = np.percentile(vmap[np.where(mask)], np.linspace(0, 100, Np + 1))
edges[0] = -1.
edges[-1] = 10000
nlevs = [] # from filtering variance map
nlevs_data = [] # from data variance map
fskies = []
masks = []
for i in range(1, Np + 1):
this_mask = (vmap > edges[i - 1]) & (vmap <= edges[i])
nlevs.append(np.mean(vmap[mask * this_mask]))
fskies.append(np.mean(mask * this_mask))
if rvmap_uKamin_data is not None:
nlevs_data.append(np.mean(_read_map(rvmap_uKamin_data[mask * this_mask])))
if ret_masks:
masks.append(this_mask * mask)
if rvmap_uKamin_data is None:
nlevs_data = nlevs
nlev_fid = np.sqrt(4. * np.pi / npix / np.sum(_read_map(pix_ivmap)) * np.sum(mask)) * 180. * 60. / np.pi
for nf, nd in zip(nlevs, nlevs_data):
print('%.2f (ftl) %.2f (dat) uKamin' % (nf, nd))
print('%.2f (fid)' %nlev_fid)
return nlevs, nlevs_data, nlev_fid, fskies, masks
def get_nlev_fid(pix_ivmap):
mask = _read_map(pix_ivmap) > 0
nlev_fid = np.sqrt(4. * np.pi / mask.size / np.sum(_read_map(pix_ivmap)) * np.sum(mask)) * 180. * 60. / np.pi
return nlev_fid
def get_fal(a, cl_len, nlev, transf, lmin, lmax):
"""Simple diagonal isotropic filter
"""
fal = utils.cli(cl_len.get(a + a)[:lmax + 1] + (nlev / 60. / 180. * np.pi) ** 2 / transf[:lmax + 1] ** 2)
fal[:lmin] *= 0.
return fal
[docs]def get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_m, nlevp_m, transf, jt_tp=False):
"""inverse filtered spectra (spectra of Cov^-1 X) for CMB inverse-variance filtering
Args:
cls_cmb_dat: dict of cmb cls of the data maps
cls_cmb_filt: dict of cmb cls used in the filtering matrix
lmin: minimum multipole considered
lmax: maximum multipole considered
nlevt_f: fiducial temperature noise level used in the filtering in uK-amin
nlevp_f: fiducial polarization noise level used in the filtering in uK-amin
nlevt_m: temperature noise level of the data in uK-amin
nlevp_m: polarization noise level of the data in uK-amin
transf: CMB transfer function
jt_tp: if set joint temperature-polarization filtering is performed. If not they are filtered independently
Returns:
dict of inverse-variance filtered maps spectra (for N0 calcs.)
dict of filtering matrix spectra (for response calcs. This has no dependence on the data parts of the inputs)
"""
ivf_cls = {}
if not jt_tp:
filt_cls_i = {}
for a in ['t']:
ivf_cls[a + a] = get_fal(a, cls_cmb_filt, nlevt_f, transf, lmin, lmax) ** 2 * utils.cli(get_fal(a, cls_cmb_dat, nlevt_m, transf, 0, lmax))
filt_cls_i[a + a] = get_fal(a, cls_cmb_filt, nlevt_f, transf, lmin, lmax)
for a in ['e', 'b']:
ivf_cls[a + a] = get_fal(a, cls_cmb_filt, nlevp_f, transf, lmin, lmax) ** 2 * utils.cli(get_fal(a, cls_cmb_dat, nlevp_m, transf, 0, lmax))
filt_cls_i[a + a] = get_fal(a, cls_cmb_filt, nlevp_f, transf, lmin, lmax)
ivf_cls['te'] = cls_cmb_dat['te'][:lmax + 1] * get_fal('e', cls_cmb_filt, nlevp_f, transf, lmin, lmax) * get_fal('t', cls_cmb_filt, nlevt_f, transf, lmin, lmax)
return ivf_cls, filt_cls_i
else:
filt_cls = np.zeros((3, 3, lmax + 1), dtype=float)
dat_cls = np.zeros((3, 3, lmax + 1), dtype=float)
filt_cls[0, 0] = utils.cli(get_fal('t', cls_cmb_filt, nlevt_f, transf, lmin, lmax))
filt_cls[1, 1] = utils.cli(get_fal('e', cls_cmb_filt, nlevp_f, transf, lmin, lmax))
filt_cls[2, 2] = utils.cli(get_fal('b', cls_cmb_filt, nlevp_f, transf, lmin, lmax))
filt_cls[0, 1, lmin:] = cls_cmb_filt['te'][lmin:lmax + 1]
filt_cls[1, 0, lmin:] = cls_cmb_filt['te'][lmin:lmax + 1]
dat_cls[0, 0] = utils.cli(get_fal('t', cls_cmb_dat, nlevt_m, transf, lmin, lmax))
dat_cls[1, 1] = utils.cli(get_fal('e', cls_cmb_dat, nlevp_m, transf, lmin, lmax))
dat_cls[2, 2] = utils.cli(get_fal('b', cls_cmb_dat, nlevp_m, transf, lmin, lmax))
dat_cls[0, 1, lmin:] = cls_cmb_dat['te'][lmin:lmax + 1]
dat_cls[1, 0, lmin:] = cls_cmb_dat['te'][lmin:lmax + 1]
filt_cls_i = np.linalg.pinv(filt_cls.swapaxes(0, 2)).swapaxes(0, 2)
return cls_dot(filt_cls_i, dat_cls, lmin, lmax), \
{'tt':filt_cls_i[0,0], 'ee':filt_cls_i[1, 1], 'bb':filt_cls_i[2, 2], 'te':filt_cls_i[0, 1]}
def cls_dot(cls_fidi, cls_dat, lmin, lmax):
zro = np.zeros(lmax + 1, dtype=float)
ret = {'tt':zro.copy(), 'te':zro.copy(), 'ee':zro.copy(), 'bb':zro.copy()}
for i in range(3):
for j in range(3):
ret['tt'] += cls_fidi[0, i] * cls_fidi[0, j] * cls_dat[i, j]
ret['te'] += cls_fidi[0, i] * cls_fidi[1, j] * cls_dat[i, j]
ret['ee'] += cls_fidi[1, i] * cls_fidi[1, j] * cls_dat[i, j]
ret['bb'] += cls_fidi[2, i] * cls_fidi[2, j] * cls_dat[i, j]
for cl in ret.values():
cl[:lmin] *= 0
return ret
[docs]def get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin, lmax, lmax_qlm, transf, nlevts_filt, nlevps_filt,
joint_TP=False, cacher=cachers.cacher_mem(), source='p'):
"""Collects estimator responses for a list of filtering noise levels
Args:
qe_key: QE estimator key
cls_cmb_dat: CMB cls of the data maps
cls_cmb_filt: CMB cls used for the filtering
cls_weight: CMB cls in the QE weights
lmin: minimum CMB multipole considered
lmax: maximum CMB multipole considered
lmax_qlm: QE output lmax
transf: CMB transfer function
nlevts_filt: list or array of filtering temperature noise levels
nlevps_filt: list or array of filtering polarization noise levels
joint_TP: uses joint temperature and polarization filtering if set, separate if not
cacher: can be used to store results
source: QE response anisotropy source (defaults to lensing)
Returns:
lists of responses (GG, CC, GC CG for spin-weight QE)
Note:
Results may be stored with the cacher but only the filtering noise levels, QE keys and joint_TP are differentiated in the filename
"""
resps = []
for i, (nlevt_f, nlevp_f) in utils.enumerate_progress(list(zip(nlevts_filt, nlevps_filt)), 'collecting responses'):
fname = 'vmapresps%s_%s_%s' % ('jTP' * joint_TP, qe_key, qe_key) + utils.clhash(np.array([nlevt_f, nlevp_f]))
if not cacher.is_cached(fname):
cls_filt_i = get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_f, nlevp_f, transf, jt_tp=joint_TP)[1]
this_resp = qresp.get_response(qe_key, lmax, source, cls_weight, cls_cmb_dat, cls_filt_i, lmax_qlm=lmax_qlm)
cacher.cache(fname, this_resp)
resps.append(cacher.load(fname))
return resps
[docs]def get_nhls(qe_key1, qe_key2, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin, lmax, lmax_qlm, transf, nlevts_filt, nlevts_map, nlevps_filt, nlevps_map,
joint_TP=False, cacher=cachers.cacher_mem()):
"""Collects unnormalized estimator noise levels for a list of filtering noise levels and data map noise levels
Args:
qe_key1: first QE estimator key
qe_key2: second QE estimator key
cls_cmb_dat: CMB cls of the data maps
cls_cmb_filt: CMB cls used for the filtering
cls_weight: CMB cls in the QE weights
lmin: minimum CMB multipole considered
lmax: maximum CMB multipole considered
lmax_qlm: QE output lmax
transf: CMB transfer function
nlevts_filt: list or array of filtering temperature noise levels
nlevts_map: list or array of data map temperature noise levels
nlevps_filt: list or array of filtering polarization noise levels
nlevps_map: list or array of data maptemperature noise levels
joint_TP: uses joint temperature and polarization filtering if set, separate if not
cacher: can be used to store results
Returns:
lists of reconstruction noise levels (GG, CC, GC CG for spin-weight QE)
Note:
Results may be stored with the cacher but only the filtering and data noise levels, QE keys and joint_TP are differentiated in the filename
"""
Nhls = []
for i, (nlevt_f, nlevt_m, nlevp_f, nlevp_m) in utils.enumerate_progress(list(zip(nlevts_filt, nlevts_map, nlevps_filt, nlevps_map)), 'collecting nhls'):
fname = 'vmapnhl%s_%s_%s' % ('jTP' * joint_TP, qe_key1, qe_key2) + utils.clhash(np.array([nlevt_f, nlevt_m, nlevp_f, nlevp_m]))
if not cacher.is_cached(fname):
ivf_cls = get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_m, nlevp_m, transf, jt_tp=joint_TP)[0]
this_nhl = nhl.get_nhl(qe_key1, qe_key2, cls_weight, ivf_cls, lmax, lmax, lmax_out=lmax_qlm)
cacher.cache(fname, this_nhl)
Nhls.append(cacher.load(fname))
return Nhls