Source code for filt_simple

"""simple CMB filtering module.

This module collects a couple of fast (non-iterative) filtering methods.

"""
from __future__ import print_function

import healpy as hp
import numpy  as np
import pickle as pk
import os

from plancklens.helpers import mpi
from plancklens import utils

[docs] class library_sepTP(object): """Template class for CMB inverse-variance and Wiener-filtering library. This is suitable whenever the temperature and polarization maps are independently filtered. Args: lib_dir (str): directory where hashes and filtered maps will be cached. sim_lib : simulation library instance. *sim_lib* must have *get_sim_tmap* and *get_sim_pmap* methods. cl_weights: CMB spectra, used to compute the Wiener-filtered CMB from the inverse variance filtered maps. """ def __init__(self, lib_dir, sim_lib, cl_weights, soltn_lib=None, cache=True): self.lib_dir = lib_dir self.sim_lib = sim_lib self.cl = cl_weights self.soltn_lib = soltn_lib self.cache = cache fn_hash = os.path.join(lib_dir, 'filt_hash.pk') if mpi.rank == 0: if not os.path.exists(lib_dir): os.makedirs(lib_dir) if not os.path.exists(fn_hash): pk.dump(self.hashdict(), open(fn_hash, 'wb'), protocol=2) mpi.barrier() utils.hash_check(pk.load(open(fn_hash, 'rb')), self.hashdict(), fn=fn_hash) def hashdict(self): assert 0, 'override this' def get_fmask(self): assert 0, 'override this' def _apply_ivf_t(self, tmap, soltn=None): assert 0, 'override this' def _apply_ivf_p(self, pmap, soltn=None): assert 0, 'override this'
[docs] def get_ftl(self): """Isotropic approximation to temperature inverse variance filtering. :math:`F^{T}_\ell = (C_\ell^{TT} + N^{T}_\ell / b_\ell^2)^{-1}` """ assert 0, 'override this'
[docs] def get_fel(self): """Isotropic approximation to E-polarization inverse variance filtering. :math:`F^{E}_\ell = (C_\ell^{EE} + N^{E}_\ell / b_\ell^2)^{-1}` """ assert 0, 'override this'
[docs] def get_fbl(self): """Isotropic approximation to B-polarization inverse variance filtering. :math:`F^{B}_\ell = (C_\ell^{BB} + N^{B}_\ell / b_\ell^2)^{-1}` """ assert 0, 'override this'
def get_tal(self, a): assert 0, 'override this'
[docs] def get_sim_tlm(self, idx): """Returns an inverse-filtered temperature simulation. Args: idx: simulation index Returns: inverse-filtered temperature healpy alm array """ tfname = os.path.join(self.lib_dir, 'sim_%04d_tlm.fits'%idx if idx >= 0 else 'dat_tlm.fits') if not os.path.exists(tfname): tlm = self._apply_ivf_t(self.sim_lib.get_sim_tmap(idx), soltn=None if self.soltn_lib is None else self.soltn_lib.get_sim_tmliklm(idx)) if self.cache: hp.write_alm(tfname, tlm, overwrite=True) return tlm return hp.read_alm(tfname)
[docs] def get_sim_elm(self, idx): """Returns an inverse-filtered E-polarization simulation. Args: idx: simulation index Returns: inverse-filtered E-polarization healpy alm array """ tfname = os.path.join(self.lib_dir, 'sim_%04d_elm.fits'%idx if idx >= 0 else 'dat_elm.fits') if not os.path.exists(tfname): if self.soltn_lib is None: soltn = None else: soltn = np.array([self.soltn_lib.get_sim_emliklm(idx), self.soltn_lib.get_sim_bmliklm(idx)]) elm, blm = self._apply_ivf_p(self.sim_lib.get_sim_pmap(idx), soltn=soltn) if self.cache: hp.write_alm(tfname, elm, overwrite=True) hp.write_alm(os.path.join(self.lib_dir, 'sim_%04d_blm.fits'%idx if idx >= 0 else 'dat_blm.fits'), blm, overwrite=True) return elm else: return hp.read_alm(tfname)
[docs] def get_sim_blm(self, idx): """Returns an inverse-filtered B-polarization simulation. Args: idx: simulation index Returns: inverse-filtered B-polarization healpy alm array """ tfname = os.path.join(self.lib_dir, 'sim_%04d_blm.fits'%idx if idx >= 0 else 'dat_blm.fits') if not os.path.exists(tfname): if self.soltn_lib is None: soltn = None else: soltn = np.array([self.soltn_lib.get_sim_emliklm(idx), self.soltn_lib.get_sim_bmliklm(idx)]) elm, blm = self._apply_ivf_p(self.sim_lib.get_sim_pmap(idx), soltn=soltn) if self.cache: hp.write_alm(tfname, blm, overwrite=True) hp.write_alm(os.path.join(self.lib_dir, 'sim_%04d_elm.fits'%idx if idx >= 0 else 'dat_elm.fits'), elm, overwrite=True) return blm else: return hp.read_alm(tfname)
[docs] def get_sim_tmliklm(self, idx): """Returns a Wiener-filtered temperature simulation. Args: idx: simulation index Returns: Wiener-filtered temperature healpy alm array """ return hp.almxfl(self.get_sim_tlm(idx), self.cl['tt'])
[docs] def get_sim_emliklm(self, idx): """Returns a Wiener-filtered E-polarization simulation. Args: idx: simulation index Returns: Wiener-filtered E-polarization healpy alm array """ return hp.almxfl(self.get_sim_elm(idx), self.cl['ee'])
[docs] def get_sim_bmliklm(self, idx): """Returns a Wiener-filtered B-polarization simulation. Args: idx: simulation index Returns: Wiener-filtered B-polarization healpy alm array """ return hp.almxfl(self.get_sim_blm(idx), self.cl['bb'])
[docs] class library_jTP(object): """Template class for CMB inverse-variance and Wiener-filtering library. This one is suitable whenever the temperature and polarization maps are jointly filtered. Args: lib_dir (str): directory where hashes and filtered maps will be cached. sim_lib : simulation library instance. *sim_lib* must have *get_sim_tmap* and *get_sim_pmap* methods. cl_weights: CMB spectra, used to compute the Wiener-filtered CMB from the inverse variance filtered maps. """ def __init__(self, lib_dir, sim_lib, cl_weights, soltn_lib=None, cache=True): assert np.all([k in cl_weights.keys() for k in ['tt', 'ee', 'bb']]) self.lib_dir = lib_dir self.sim_lib = sim_lib self.cl = cl_weights self.soltn_lib = soltn_lib self.cache = cache fn_hash = os.path.join(lib_dir, 'filt_hash.pk') if mpi.rank == 0: if not os.path.exists(lib_dir): os.makedirs(lib_dir) if not os.path.exists(fn_hash): pk.dump(self.hashdict(), open(fn_hash, 'wb'), protocol=2) mpi.barrier() utils.hash_check(pk.load(open(fn_hash, 'rb')), self.hashdict(), fn=fn_hash) def hashdict(self): assert 0, 'override this' def get_fmask(self): assert 0, 'override this' def _apply_ivf(self, tqumap, soltn=None): assert 0, 'override this'
[docs] def get_fal(self): """Isotropic matrix approximation to inverse variance filtering :math:`F_\ell \sim (C_\ell + N_\ell / b_\ell^2)^{-1}` Output is dictionary with the usual 'tt', 'ee', 'te', 'bb', ... keys. """ assert 0, 'override this'
def _get_alms(self, a, idx): assert a in ['t', 'e', 'b'] tfname = os.path.join(self.lib_dir, 'sim_%04d_tlm.fits' % idx if idx >= 0 else 'dat_tlm.fits') fname = tfname.replace('tlm.fits', a + 'lm.fits') if not os.path.exists(fname): T = self.sim_lib.get_sim_tmap(idx) Q, U = self.sim_lib.get_sim_pmap(idx) if self.soltn_lib is None: soltn = None else: tlm = self.soltn_lib.get_sim_tmliklm(idx) elm = self.soltn_lib.get_sim_emliklm(idx) blm = self.soltn_lib.get_sim_bmliklm(idx) soltn = (tlm, elm, blm) tlm, elm, blm = self._apply_ivf([T, Q, U], soltn=soltn) if self.cache: hp.write_alm(tfname.replace('tlm.fits', 'tlm.fits'), tlm) hp.write_alm(tfname.replace('tlm.fits', 'elm.fits'), elm) hp.write_alm(tfname.replace('tlm.fits', 'blm.fits'), blm) return hp.read_alm(fname)
[docs] def get_sim_tlm(self, idx): """Returns an inverse-filtered temperature simulation. Args: idx: simulation index Returns: inverse-filtered temperature healpy alm array """ return self._get_alms('t', idx)
[docs] def get_sim_elm(self, idx): """Returns an inverse-filtered E-polarization simulation. Args: idx: simulation index Returns: inverse-filtered E-polarization healpy alm array """ return self._get_alms('e', idx)
[docs] def get_sim_blm(self, idx): """Returns an inverse-filtered B-polarization simulation. Args: idx: simulation index Returns: inverse-filtered B-polarization healpy alm array """ return self._get_alms('b', idx)
[docs] def get_sim_tmliklm(self, idx): """Returns a Wiener-filtered temperature simulation. Args: idx: simulation index Returns: Wiener-filtered temperature healpy alm array """ ret = hp.almxfl(self.get_sim_tlm(idx), self.cl['tt']) for k in ['te', 'tb']: cl = self.cl.get(k[0] + k[1], self.cl.get(k[1] + k[0], None)) if cl is not None: ret += hp.almxfl(self._get_alms(k[1], idx), cl) return ret
[docs] def get_sim_emliklm(self, idx): """Returns a Wiener-filtered E-polarization simulation. Args: idx: simulation index Returns: Wiener-filtered E-polarization healpy alm array """ ret = hp.almxfl(self.get_sim_elm(idx), self.cl['ee']) for k in ['et', 'eb']: cl = self.cl.get(k[0] + k[1], self.cl.get(k[1] + k[0], None)) if cl is not None: ret += hp.almxfl(self._get_alms(k[1], idx), cl) return ret
[docs] def get_sim_bmliklm(self, idx): """Returns a Wiener-filtered B-polarization simulation. Args: idx: simulation index Returns: Wiener-filtered B-polarization healpy alm array """ ret = hp.almxfl(self.get_sim_blm(idx), self.cl['bb']) for k in ['bt', 'be']: cl = self.cl.get(k[0] + k[1], self.cl.get(k[1] + k[0], None)) if cl is not None: ret += hp.almxfl(self._get_alms(k[1], idx), cl) return ret
[docs] class library_fullsky_sepTP(library_sepTP): """Full-sky isotropic filtering instance. Args: lib_dir: directory where hashes and filtered maps will be cached. sim_lib: simulation library instance to inverse-filter nside: healpix resolution of the simulation library cl_len : CMB spectra, used to compute the Wiener-filtered CMB from the inverse variance filtered maps. transf : fiducial transfer function of the CMB maps. (if dict, then must have keys 't' 'e' and 'b' for individual transfer functions) ftl (1d-array): isotropic filtering array for temperature (filtered tlm's are ftl * tlm of the data) fel (1d-array): isotropic filtering array for E-pol. (filtered elm's are fel * elm of the data) fbl (1d-array): isotropic filtering array for B-po. (filtered blm's are fbl * blm of the data) cache: filtered alm's will be cached if set. """ def __init__(self, lib_dir, sim_lib, nside, transf:np.ndarray or dict, cl_len, ftl, fel, fbl, cache=False): transfd = transf if isinstance(transf, dict) else {'t': transf, 'e': transf, 'b': transf} assert 't' in transfd.keys() and 'e' in transfd.keys() and 'b' in transfd.keys() self.sim_lib = sim_lib self.ftl = ftl self.fel = fel self.fbl = fbl self.lmax_fl = np.max([len(ftl), len(fel), len(fbl)]) - 1 self.nside = nside self.transf = transfd super(library_fullsky_sepTP, self).__init__(lib_dir, sim_lib, cl_len, cache=cache) def hashdict(self): return {'sim_lib':self.sim_lib.hashdict(), 'transf': utils.clhash(self.transf['t']), 'cl_len': {k: utils.clhash(self.cl[k]) for k in ['tt', 'ee', 'bb']}, 'ftl': utils.clhash(self.ftl), 'fel': utils.clhash(self.fel), 'fbl': utils.clhash(self.fbl)} def get_fmask(self): return np.ones(hp.nside2npix(self.nside), dtype=float) def get_tal(self, a): assert (a.lower() in ['t', 'e', 'b']) return utils.cli(self.transf[a.lower()]) def get_ftl(self): return np.copy(self.ftl) def get_fel(self): return np.copy(self.fel) def get_fbl(self): return np.copy(self.fbl) def _apply_ivf_t(self, tmap, soltn=None): assert len(tmap) == hp.nside2npix(self.nside), (hp.npix2nside(tmap.size), self.nside) alm = hp.map2alm(tmap, lmax=self.lmax_fl, iter=0) return hp.almxfl(alm, self.get_ftl() * utils.cli(self.transf['t'][:len(self.ftl)])) def _apply_ivf_p(self, pmap, soltn=None): assert len(pmap[0]) == hp.nside2npix(self.nside) and len(pmap[0]) == len(pmap[1]) elm, blm = hp.map2alm_spin([m for m in pmap], 2, lmax=self.lmax_fl) elm = hp.almxfl(elm, self.get_fel() * utils.cli(self.transf['e'][:len(self.fel)])) blm = hp.almxfl(blm, self.get_fbl() * utils.cli(self.transf['b'][:len(self.fbl)])) return elm, blm
class library_fullsky_alms_sepTP(library_sepTP): """Full-sky isotropic filtering instance, but with harmonic space inputs Args: lib_dir: directory where hashes and filtered maps will be cached. sim_lib: simulation library instance to inverse-filter cl_len : CMB spectra, used to compute the Wiener-filtered CMB from the inverse variance filtered maps. transf : fiducial transfer function of the CMB maps. (if dict, then must have keys 't' 'e' and 'b' for individual transfer functions) ftl (1d-array): isotropic filtering array for temperature (filtered tlm's are ftl * tlm of the data) fel (1d-array): isotropic filtering array for E-pol. (filtered elm's are fel * elm of the data) fbl (1d-array): isotropic filtering array for B-po. (filtered blm's are fbl * blm of the data) cache: filtered alm's will be cached if set. """ def __init__(self, lib_dir, sim_lib, transf:np.ndarray or dict, cl_len, ftl, fel, fbl, cache=False): transfd = transf if isinstance(transf, dict) else {'t': transf, 'e': transf, 'b': transf} assert 't' in transfd.keys() and 'e' in transfd.keys() and 'b' in transfd.keys() self.sim_lib = sim_lib self.ftl = ftl self.fel = fel self.fbl = fbl self.lmax_fl = np.max([len(ftl), len(fel), len(fbl)]) - 1 self.transf = transfd super(library_fullsky_alms_sepTP, self).__init__(lib_dir, sim_lib, cl_len, cache=cache) def hashdict(self): return {'sim_lib':self.sim_lib.hashdict(), 'transf': utils.clhash(self.transf['t']), 'cl_len': {k: utils.clhash(self.cl[k]) for k in ['tt', 'ee', 'bb']}, 'ftl': utils.clhash(self.ftl), 'fel': utils.clhash(self.fel), 'fbl': utils.clhash(self.fbl)} def get_fmask(self): return np.array([1.]) # For compatibility purposes only... def get_tal(self, a): assert (a.lower() in ['t', 'e', 'b']) return utils.cli(self.transf[a.lower()]) def get_ftl(self): return np.copy(self.ftl) def get_fel(self): return np.copy(self.fel) def get_fbl(self): return np.copy(self.fbl) def _apply_ivf_t(self, tlm, soltn=None): #if (hp.Alm.getlmax(tlm.size) + 1) > len(self.ftl): # tlm = utils.alm_copy(tlm, lmax=self.ftl) return hp.almxfl(tlm, self.get_ftl() * utils.cli(self.transf['t'][:len(self.ftl)])) def _apply_ivf_p(self, eblm, soltn=None): #if (hp.Alm.getlmax(eblm[0].size) + 1) > len(self.fel): # eblm[0] = utils.alm_copy(eblm[0], lmax=len(self.fel) - 1) #if (hp.Alm.getlmax(eblm[1].size) + 1) > len(self.fbl): # eblm[1] = utils.alm_copy(eblm[1], lmax=len(self.fbl) - 1) elm = hp.almxfl(eblm[0], self.get_fel() * utils.cli(self.transf['e'][:len(self.fel)])) blm = hp.almxfl(eblm[1], self.get_fbl() * utils.cli(self.transf['b'][:len(self.fbl)])) return elm, blm
[docs] class library_apo_sepTP(library_sepTP): """ Library to perform inverse variance filtering on the sim_lib library using simple mask apo and isotropic filtering. Args: lib_dir: directory where hashes and filtered maps will be cached. sim_lib: simulation library instance to inverse-filter apomask_path : path of the (presumably apodized) mask cl_len : CMB spectra, used to compute the Wiener-filtered CMB from the inverse variance filtered maps. transf : fiducial transfer function of the CMB maps. ftl (1d-array): isotropic filtering array for temperature (filtered tlm's are ftl * tlm of the data) fel (1d-array): isotropic filtering array for E-pol. (filtered elm's are fel * elm of the data) fbl (1d-array): isotropic filtering array for B-po. (filtered blm's are fbl * blm of the data) cache: filtered alm's will be cached if set. """ def __init__(self, lib_dir, sim_lib, apomask_path, cl_len, transf, ftl, fel, fbl, cache=False): assert len(transf) >= np.max([len(ftl), len(fel), len(fbl)]) assert np.all([k in cl_len.keys() for k in ['tt', 'ee', 'bb']]) assert os.path.exists(apomask_path) self.ftl = ftl self.fel = fel self.fbl = fbl self.transf = transf self.lmax_fl = np.max([len(ftl), len(fel), len(fbl)]) - 1 self.apomask_path = apomask_path self.nside = hp.npix2nside(hp.read_map(apomask_path).size) super(library_apo_sepTP, self).__init__(lib_dir, sim_lib, cl_len, cache=cache) def hashdict(self): return {'sim_lib':self.sim_lib.hashdict(), 'apomask': self.apomask_path, 'transf': utils.clhash(self.transf), 'cl_len': {k: utils.clhash(self.cl[k]) for k in ['tt', 'ee', 'bb']}, 'ftl': utils.clhash(self.ftl), 'fel': utils.clhash(self.fel), 'fbl': utils.clhash(self.fbl)} def get_fmask(self): return hp.read_map(self.apomask_path) def get_tal(self, a): assert (a.lower() in ['t', 'e', 'b']) return utils.cli(self.transf) def get_ftl(self): return np.copy(self.ftl) def get_fel(self): return np.copy(self.fel) def get_fbl(self): return np.copy(self.fbl) def _apply_ivf_t(self, tmap, soltn=None): assert len(tmap) == hp.nside2npix(self.nside), (hp.npix2nside(tmap.size), self.nside) alm = hp.map2alm(tmap * self.get_fmask(), lmax=self.lmax_fl, iter=0) return hp.almxfl(alm, self.get_ftl() * utils.cli(self.transf[:len(self.ftl)])) def _apply_ivf_p(self, pmap, soltn=None): assert len(pmap[0]) == hp.nside2npix(self.nside) and len(pmap[0]) == len(pmap[1]) elm, blm = hp.map2alm_spin([m * self.get_fmask() for m in pmap], 2, lmax=self.lmax_fl) elm = hp.almxfl(elm, self.get_fel() * utils.cli(self.transf[:len(self.fel)])) blm = hp.almxfl(blm, self.get_fbl() * utils.cli(self.transf[:len(self.fbl)])) return elm, blm