Source code for plancklens.patchy.patchy

"""This module contain methods for QE-related analytical predictions on data or filtering with inhomogeneous noise

"""
import healpy as hp
import numpy as np
from plancklens import utils, nhl, qresp
from plancklens.helpers import cachers
from plancklens.qcinv.util import read_map
def _read_map(m):
    return read_map(m)


[docs] def get_patchy_N0s(qekey_in, npatches, pixivmap_t, pixivmap_p, cls_unl, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, rvmap_uKamin_t_data=None, rvmap_uKamin_p_data=None, joint_TP=False, nlevt_fid=None, nlevp_fid=None, cacher=cachers.cacher_mem(), source='p', patch_method='percentiles', verbose=False): """Collects the effective reconstruction noise levels for different filtering and spectrum weighting schemes Args: qekey_in: QE anisotroy key npatches: the variance map will be split into this number of regions of equal sky areas pixivmap_t: inverse temperature noise pixel variance map used for the T. filtering pixivmap_p: inverse polarization noise pixel variance map used for the Pol. filtering cls_unl: unlensed CMB dict cls_cmb_dat: CMB spectra dict entering the data maps cls_cmb_filt: CMB spectra dict entering the filtering steps cls_weight: CMB spectra dict entering the QE weights (numerators) lmin_ivf: minimal CMB mutlipole lmax_ivf: maximal CMB multipole lmax_qlm: maximal QE multipole transf: CMB transfer function cl rvmap_uKamin_t_data (optional): set this to the data temperature noise map (in uK amin), if different from the one defining the filtering rvmap_uKamin_p_data (optional): set this to the data polarisation noise map (in uK amin), if different from the one defining the filtering joint_TP: set this to true if temperature and polarization are jointly filtered before building the QE nlevt_fid: set this to the fiducial temperature noise value to use for the single full-sky normalization nlevp_fid: set this to the fiducial polarisation noise value to use for the single full-sky normalization cacher: can use this to store results (descriptors only use the noise levels, joint_TP and qe_keys though) source: anistropy source for the responses calculations Returns: N0s: a dict of N0 arrays for different filtering and spectr weighting types MCcorr: prediction of the Monte-Carlo correction of the spectrum for inhom. filtering cMCcorr: Same for the cross-spectrum to the true lensing """ assert qekey_in[0] in ['p', 'x'], 'fix curl fiducial and MC correction' qe_key = 'p' + qekey_in[1:] if (not joint_TP) and qe_key == 'ptt': # dont need pol partitioning here nlevst_ftl, nlevst_data, _nlevt_fid, fskiest, masks = mk_patches(npatches, pixivmap_t, rvmap_uKamin_data=rvmap_uKamin_t_data, method=patch_method, verbose=verbose) nlevsp_ftl, nlevsp_data, _nlevp_fid, fskiesp = (1e30 * np.ones_like(nlevst_ftl), 1e30 * np.copy(nlevst_data), 1e30, fskiest.copy()) elif (not joint_TP) and qe_key == 'p_p':# dont need T here nlevsp_ftl, nlevsp_data, _nlevp_fid, fskiesp, masks = mk_patches(npatches, pixivmap_p, rvmap_uKamin_data=rvmap_uKamin_p_data, method=patch_method, verbose=verbose) nlevst_ftl, nlevst_data, _nlevt_fid, fskiest = (1e30 * np.ones_like(nlevsp_ftl), 1e30 * np.copy(nlevsp_data), 1e30, fskiesp.copy()) else: nlevst_ftl, nlevst_data, _nlevt_fid, fskiest, masks = mk_patches(npatches, pixivmap_t, rvmap_uKamin_data=rvmap_uKamin_t_data, method=patch_method, verbose=verbose) nlevsp_ftl, nlevsp_data, _nlevp_fid, fskiesp, masks = mk_patches(npatches, pixivmap_p, rvmap_uKamin_data=rvmap_uKamin_p_data, method=patch_method, verbose=verbose) if nlevt_fid is None: nlevt_fid = _nlevt_fid if nlevp_fid is None: nlevp_fid = _nlevp_fid assert np.allclose(fskiest, fskiesp, atol=1e-6), (np.array(fskiest)-np.array(fskiesp), fskiesp) fskies = fskiest cpp = cls_unl['pp'][:lmax_qlm+1] rid = 0 if qekey_in[0] == 'p' else 1 if qekey_in[0] == 'x': cpp *= 0. rfid = get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, [nlevt_fid], [nlevp_fid], joint_TP=joint_TP, cacher=cacher, source=source)[0] resps = get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, nlevst_ftl, nlevsp_ftl, joint_TP=joint_TP, cacher=cacher, source=source) nhls_pds = get_nhls(qe_key, qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, nlevst_ftl, nlevst_data, nlevsp_ftl, nlevsp_data, joint_TP=joint_TP,cacher=cacher) nhls_fds = get_nhls(qe_key, qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin_ivf, lmax_ivf, lmax_qlm, transf, [nlevt_fid] * npatches, nlevst_data, [nlevp_fid] * npatches, nlevsp_data, joint_TP=joint_TP,cacher=cacher) labels = ['hom-filt, no-rew', 'hom-filt, mv-rew', 'inhom-filt, no-rew', 'inhom-filt, mv-rew'] N0s = {q: np.zeros(lmax_qlm + 1, dtype=float) for q in labels} MCcorr_vmap = np.zeros(lmax_qlm + 1, dtype=float) cMCcorr_vmap = np.zeros(lmax_qlm + 1, dtype=float) fsky_tot = np.sum(fskies) print('total observed sky area %.3f'%fsky_tot) print('Using nlev fids %.3f %.3f'%(nlevt_fid, nlevp_fid)) rfidi = utils.cli(rfid[rid]) for i, (fsky, resp, nhl_pd, nhl_fd) in enumerate(zip(fskies, resps, nhls_pds, nhls_fds)): fp_f = fsky / fsky_tot Rp_Rf = resp[rid] * rfidi N0s['hom-filt, no-rew'] += fp_f * (cpp + nhl_fd[rid] * rfidi ** 2) ** 2 # : spectrum of homo. filtered map without any spectra reweighting N0s['inhom-filt, no-rew'] += fp_f * (Rp_Rf ** 2 * cpp + nhl_pd[rid] * rfidi ** 2) ** 2 # : spectrum of inhomo. filtered map without any spectra reweighting N0s['hom-filt, mv-rew'] += fp_f * utils.cli((cpp + nhl_fd[rid] * rfidi ** 2) ** 2) # : inverse variance weighting of homog filtered map N0s['inhom-filt, mv-rew'] += fp_f * utils.cli((cpp + nhl_pd[rid] * rfidi ** 2 * utils.cli(Rp_Rf ** 2)) ** 2) # : inverse variance weighting of inhomog filtered map MCcorr_vmap += fp_f * Rp_Rf ** 2 cMCcorr_vmap += fp_f * Rp_Rf N0s['hom-filt, mv-rew'] = utils.cli(N0s['hom-filt, mv-rew']) N0s['inhom-filt, mv-rew'] = utils.cli(N0s['inhom-filt, mv-rew']) N0s['inhom-filt, no-rew'] *= utils.cli(MCcorr_vmap ** 2) for spec in N0s.values(): spec[:] = np.sqrt(spec) - cpp return N0s, MCcorr_vmap, cMCcorr_vmap
[docs] def mk_patches(Np, pix_ivmap, rvmap_uKamin_data=None, ret_masks=False, method='percentiles', verbose=False): """Splits the variance maps into equal-area regions with different noise levels Args: Np: desired number of patches pix_ivmap: input inverse pixel variance map used for the filtering rvmap_uKamin_data: root variance map in uK amin of the data (if different from pix_ivmap) ret_masks: returns the defined series of masks if set method: which method should be used to perform the calculation percentiles: equal sky areas regions linear: equally spaced nlevs in uK (this is best in terms of convergence towards an integral) """ mask = _read_map(pix_ivmap) > 0 npix = mask.size nside = hp.npix2nside(npix) nlev_map = utils.cli(np.sqrt(_read_map(pix_ivmap))) * np.sqrt(hp.nside2pixarea(nside)) / np.pi * 60 * 180. nlev_map_mask = nlev_map # map used to define the sky areas if np.unique(nlev_map_mask[np.where(mask)]).size <= 1 : assert rvmap_uKamin_data is not None, ('uniform map ? this wont work') nlev_map_mask = _read_map(rvmap_uKamin_data) mask = _read_map(nlev_map_mask) > 0 assert np.unique(nlev_map_mask[np.where(mask)]).size > 1 if method == 'percentiles': edges = np.percentile(nlev_map_mask[np.where(mask)], np.linspace(0, 100, Np + 1)) elif method == 'linear': edges = np.linspace(np.min(nlev_map_mask[np.where(mask)]), np.max(nlev_map_mask[np.where(mask)]), Np + 1) elif method == 'linear_vmap': ninv = _read_map(pix_ivmap) edges = np.linspace(np.min(ninv[np.where(mask)]), np.max(ninv[np.where(mask)]), Np + 1) del ninv edges = 1./np.sqrt(edges[::-1]) * np.sqrt(hp.nside2pixarea(nside)) / np.pi * 60 * 180. else: assert 0, 'method ' + method + ' not implemented' edges[0] = -1. edges[-1] = 10000 nlevs = [] # from filtering variance map nlevs_data = [] # from data variance map fskies = [] masks = [] for i in range(1, Np + 1): this_mask = (nlev_map > edges[i - 1]) & (nlev_map <= edges[i]) this_fsky = np.mean(mask * this_mask) if this_fsky > 0: nlevs.append(np.mean(nlev_map[mask * this_mask])) fskies.append(this_fsky) if rvmap_uKamin_data is not None: nlevs_data.append(np.mean(_read_map(rvmap_uKamin_data[mask * this_mask]))) if ret_masks: masks.append(this_mask * mask) if rvmap_uKamin_data is None: nlevs_data = nlevs nlev_fid = np.sqrt(4. * np.pi / npix / np.sum(_read_map(pix_ivmap)) * np.sum(mask)) * 180. * 60. / np.pi if verbose: for nf, nd in zip(nlevs, nlevs_data): print('%.2f (ftl) %.2f (dat) uKamin' % (nf, nd)) print('%.2f (fid)' %nlev_fid) return nlevs, nlevs_data, nlev_fid, fskies, masks
def get_nlev_fid(pix_ivmap): mask = _read_map(pix_ivmap) > 0 nlev_fid = np.sqrt(4. * np.pi / mask.size / np.sum(_read_map(pix_ivmap)) * np.sum(mask)) * 180. * 60. / np.pi return nlev_fid def get_fal(a, cl_len, nlev, transf, lmin, lmax): """Simple diagonal isotropic filter """ fal = utils.cli(cl_len.get(a + a)[:lmax + 1] + (nlev / 60. / 180. * np.pi) ** 2 / transf[:lmax + 1] ** 2) fal[:lmin] *= 0. return fal
[docs] def get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_m, nlevp_m, transf, jt_tp=False): """inverse filtered spectra (spectra of Cov^-1 X) for CMB inverse-variance filtering Args: cls_cmb_dat: dict of cmb cls of the data maps cls_cmb_filt: dict of cmb cls used in the filtering matrix lmin: minimum multipole considered lmax: maximum multipole considered nlevt_f: fiducial temperature noise level used in the filtering in uK-amin nlevp_f: fiducial polarization noise level used in the filtering in uK-amin nlevt_m: temperature noise level of the data in uK-amin nlevp_m: polarization noise level of the data in uK-amin transf: CMB transfer function jt_tp: if set joint temperature-polarization filtering is performed. If not they are filtered independently Returns: dict of inverse-variance filtered maps spectra (for N0 calcs.) dict of filtering matrix spectra (for response calcs. This has no dependence on the data parts of the inputs) """ ivf_cls = {} if not jt_tp: filt_cls_i = {} for a in ['t']: ivf_cls[a + a] = get_fal(a, cls_cmb_filt, nlevt_f, transf, lmin, lmax) ** 2 * utils.cli(get_fal(a, cls_cmb_dat, nlevt_m, transf, 0, lmax)) filt_cls_i[a + a] = get_fal(a, cls_cmb_filt, nlevt_f, transf, lmin, lmax) for a in ['e', 'b']: ivf_cls[a + a] = get_fal(a, cls_cmb_filt, nlevp_f, transf, lmin, lmax) ** 2 * utils.cli(get_fal(a, cls_cmb_dat, nlevp_m, transf, 0, lmax)) filt_cls_i[a + a] = get_fal(a, cls_cmb_filt, nlevp_f, transf, lmin, lmax) ivf_cls['te'] = cls_cmb_dat['te'][:lmax + 1] * get_fal('e', cls_cmb_filt, nlevp_f, transf, lmin, lmax) * get_fal('t', cls_cmb_filt, nlevt_f, transf, lmin, lmax) return ivf_cls, filt_cls_i else: filt_cls = np.zeros((3, 3, lmax + 1), dtype=float) dat_cls = np.zeros((3, 3, lmax + 1), dtype=float) filt_cls[0, 0] = utils.cli(get_fal('t', cls_cmb_filt, nlevt_f, transf, lmin, lmax)) filt_cls[1, 1] = utils.cli(get_fal('e', cls_cmb_filt, nlevp_f, transf, lmin, lmax)) filt_cls[2, 2] = utils.cli(get_fal('b', cls_cmb_filt, nlevp_f, transf, lmin, lmax)) filt_cls[0, 1, lmin:] = cls_cmb_filt['te'][lmin:lmax + 1] filt_cls[1, 0, lmin:] = cls_cmb_filt['te'][lmin:lmax + 1] dat_cls[0, 0] = utils.cli(get_fal('t', cls_cmb_dat, nlevt_m, transf, lmin, lmax)) dat_cls[1, 1] = utils.cli(get_fal('e', cls_cmb_dat, nlevp_m, transf, lmin, lmax)) dat_cls[2, 2] = utils.cli(get_fal('b', cls_cmb_dat, nlevp_m, transf, lmin, lmax)) dat_cls[0, 1, lmin:] = cls_cmb_dat['te'][lmin:lmax + 1] dat_cls[1, 0, lmin:] = cls_cmb_dat['te'][lmin:lmax + 1] filt_cls_i = np.linalg.pinv(filt_cls.swapaxes(0, 2)).swapaxes(0, 2) return cls_dot(filt_cls_i, dat_cls, lmin, lmax), \ {'tt':filt_cls_i[0,0], 'ee':filt_cls_i[1, 1], 'bb':filt_cls_i[2, 2], 'te':filt_cls_i[0, 1]}
def cls_dot(cls_fidi, cls_dat, lmin, lmax): zro = np.zeros(lmax + 1, dtype=float) ret = {'tt':zro.copy(), 'te':zro.copy(), 'ee':zro.copy(), 'bb':zro.copy()} for i in range(3): for j in range(3): ret['tt'] += cls_fidi[0, i] * cls_fidi[0, j] * cls_dat[i, j] ret['te'] += cls_fidi[0, i] * cls_fidi[1, j] * cls_dat[i, j] ret['ee'] += cls_fidi[1, i] * cls_fidi[1, j] * cls_dat[i, j] ret['bb'] += cls_fidi[2, i] * cls_fidi[2, j] * cls_dat[i, j] for cl in ret.values(): cl[:lmin] *= 0 return ret
[docs] def get_responses(qe_key, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin, lmax, lmax_qlm, transf, nlevts_filt, nlevps_filt, joint_TP=False, cacher=cachers.cacher_mem(), source='p'): """Collects estimator responses for a list of filtering noise levels Args: qe_key: QE estimator key cls_cmb_dat: CMB cls of the data maps cls_cmb_filt: CMB cls used for the filtering cls_weight: CMB cls in the QE weights lmin: minimum CMB multipole considered lmax: maximum CMB multipole considered lmax_qlm: QE output lmax transf: CMB transfer function nlevts_filt: list or array of filtering temperature noise levels nlevps_filt: list or array of filtering polarization noise levels joint_TP: uses joint temperature and polarization filtering if set, separate if not cacher: can be used to store results source: QE response anisotropy source (defaults to lensing) Returns: lists of responses (GG, CC, GC CG for spin-weight QE) Note: Results may be stored with the cacher but only the filtering noise levels, QE keys and joint_TP are differentiated in the filename """ resps = [] for i, (nlevt_f, nlevp_f) in utils.enumerate_progress(list(zip(nlevts_filt, nlevps_filt)), 'collecting responses'): fname = 'vmapresps%s_%s_%s' % ('jTP' * joint_TP, qe_key, qe_key) + utils.clhash(np.array([nlevt_f, nlevp_f]), dtype=np.float32) if not cacher.is_cached(fname): cls_filt_i = get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_f, nlevp_f, transf, jt_tp=joint_TP)[1] this_resp = qresp.get_response(qe_key, lmax, source, cls_weight, cls_cmb_dat, cls_filt_i, lmax_qlm=lmax_qlm) cacher.cache(fname, this_resp) resps.append(np.array(cacher.load(fname))) return np.array(resps)
[docs] def get_nhls(qe_key1, qe_key2, cls_cmb_dat, cls_cmb_filt, cls_weight, lmin, lmax, lmax_qlm, transf, nlevts_filt, nlevts_map, nlevps_filt, nlevps_map, joint_TP=False, cacher=cachers.cacher_mem()): """Collects unnormalized estimator noise levels for a list of filtering noise levels and data map noise levels Args: qe_key1: first QE estimator key qe_key2: second QE estimator key cls_cmb_dat: CMB cls of the data maps cls_cmb_filt: CMB cls used for the filtering cls_weight: CMB cls in the QE weights lmin: minimum CMB multipole considered lmax: maximum CMB multipole considered lmax_qlm: QE output lmax transf: CMB transfer function nlevts_filt: list or array of filtering temperature noise levels nlevts_map: list or array of data map temperature noise levels nlevps_filt: list or array of filtering polarization noise levels nlevps_map: list or array of data maptemperature noise levels joint_TP: uses joint temperature and polarization filtering if set, separate if not cacher: can be used to store results Returns: lists of reconstruction noise levels (GG, CC, GC CG for spin-weight QE) Note: Results may be stored with the cacher but only the filtering and data noise levels, QE keys and joint_TP are differentiated in the filename """ Nhls = [] for i, (nlevt_f, nlevt_m, nlevp_f, nlevp_m) in utils.enumerate_progress(list(zip(nlevts_filt, nlevts_map, nlevps_filt, nlevps_map)), 'collecting nhls'): fname = 'vmapnhl%s_%s_%s' % ('jTP' * joint_TP, qe_key1, qe_key2) + utils.clhash(np.array([nlevt_f, nlevt_m, nlevp_f, nlevp_m])) if not cacher.is_cached(fname): ivf_cls = get_ivf_cls(cls_cmb_dat, cls_cmb_filt, lmin, lmax, nlevt_f, nlevp_f, nlevt_m, nlevp_m, transf, jt_tp=joint_TP)[0] this_nhl = nhl.get_nhl(qe_key1, qe_key2, cls_weight, ivf_cls, lmax, lmax, lmax_out=lmax_qlm) cacher.cache(fname, this_nhl) Nhls.append(np.array(cacher.load(fname))) return np.array(Nhls)