From bf6190c7711f5c59edc91ba1643758fcb87f3e22 Mon Sep 17 00:00:00 2001 From: jstarck Date: Mon, 9 Jun 2025 17:29:36 +0200 Subject: [PATCH] starlet transform with non dyadic scales --- pycs/sparsity/mrs/mrs_starlet.py | 510 +++++++++++++++++++++++++++---- 1 file changed, 455 insertions(+), 55 deletions(-) diff --git a/pycs/sparsity/mrs/mrs_starlet.py b/pycs/sparsity/mrs/mrs_starlet.py index 5f0beb9..5fad719 100644 --- a/pycs/sparsity/mrs/mrs_starlet.py +++ b/pycs/sparsity/mrs/mrs_starlet.py @@ -52,6 +52,8 @@ from pycs.sparsity.mrs.mrs_tools import * import getpass import time +from scipy.optimize import curve_fit + def test_mrs_class(Init=None): @@ -65,12 +67,13 @@ def test_mrs_class(Init=None): else: Nside = 1024 d = np.random.normal(size=(Nside**2 * 12)) - Ns = 9 + Ns = 5 ALM_iter = 0 C = CMRStarlet() C.init_starlet(Nside, nscale=Ns, ALM_iter=ALM_iter) print("PYTHON Code computation time:") start = time.time() + C.TypeCode=1 C.transform(d) end = time.time() print(f"Execution time python: {end - start:.4f} seconds") @@ -94,7 +97,7 @@ def test_mrs_class(Init=None): C.threshold(SigmaNoise=20) r = C.recons() if Init is None: - info(d - r, "Reconstruction error: ") + info(d - r, "Denoising: Residual Standard Deviation: ") tvs(r, title="Denoised map") if Init is None: tvs(d - r, title="Estimated noise map") @@ -108,6 +111,20 @@ def test_mrs_class(Init=None): C.eval_computation_time() + # Wavelet transform with non-dyadic scales + C = CMRStarlet() + TabResolSigma= np.array([4, 5, 7,10,12,15,20,30, 60]) + # TabResolSigma = np.array([7.45302012, 14.90604023, 29.81208047, 59.62416094 , 119.24832187, 238.49664375]) + C.init_starlet(Nside, TabResolSigma=TabResolSigma) + + n = np.random.normal(size=(Nside**2 * 12)) + C.transform(n) + print("Check the theoretical norm estimation of Gaussian noise with std=1:") + for j in range(C.ns): + s = C.coef[j,:] + print(" Standard deviation of coefficient at scale ", j+1, ": Sigma = ", s.std()) + print( " Theoretical estimation = ", C.TabNorm[j]) + return C @@ -128,11 +145,45 @@ def pycs_env_status(): print("Return code:", result.returncode) + +def sigma_filter(F_ell, nside, lmax=None, PixelWindow=False): + """ + Calcule l'écart type théorique d'une carte HEALPix de bruit blanc gaussien + convoluée par un filtre F_ell donné dans l'espace harmonique sphérique. + + Paramètres : + - F_ell : array-like, filtre en harmonique sphérique (F_ell pour chaque ell) + - sigma2_pixel : float, variance du bruit blanc par pixel (non convolué) + - nside : int, résolution HEALPix de la carte + + Retour : + - float, écart type théorique de la carte filtrée + """ + sigma2_pixel=1. + npix = 12 * nside**2 + if lmax is None: + lmax = 3 * nside + pixwin = hp.pixwin(nside, lmax=lmax) # pixel window function + # F_ell_eff = F_ell * pixwin + + ell = np.arange(len(F_ell)) + if PixelWindow: + ell = ell * hp.pixwin(nside, lmax=len(F_ell)-1) + facteur = (2 * ell + 1) * np.abs(F_ell)**2 + variance_theorique = sigma2_pixel / npix * np.sum(facteur) + return np.sqrt(variance_theorique) + + MRS_StarletTabNorm = np.array( [0.969856, 0.103676, 0.051809, 0.025798, 0.012852, 0.006446, 0.003230, 0.001725] ) +def assert_strictly_increasing(table): + for i in range(len(table) - 1): + if table[i] >= table[i + 1]: + raise ValueError(f"Table is not strictly increasing at index {i}: {table[i]} >= {table[i+1]}") + class CMRStarlet: """ Class for the starlet decomposition and reconstruction @@ -151,7 +202,16 @@ class CMRStarlet: ALM_iter = 0 TabNameCode = ["Full python", "c++ Binding", "c++ binary"] TypeCode = 0 # 0 for 'Full python', '1' for 'c++ Binding' and 2 for'c++ binary' - + Tablmax =0 # lmax for each scale + TabSigma =0 # Standard deviation of the Gaussian which fits the scaling function at every scale + TabPhi = 0 # Scaling function for each scale + TabPsi = 0 # Wavelet function for each scale + Tabh = 0 # h filter for each scale + Tabg = 0 # g filter for each scale + TabResol = 0 # Resolution of each wavelet scale in arc minute + PixelResol = 0 # pixel sizr in arc minute + l2norm = False + # __init__ is the constructor def __init__(self, name="wt", verb=False): """ @@ -170,7 +230,7 @@ def __init__(self, name="wt", verb=False): self.name = name # self.name is an object variable self.verb = verb - def init_starlet(self, nside, nscale=0, lmax=0, ALM_iter=0): + def init_starlet(self, nside, nscale=0, lmax=0, ALM_iter=0, TabResolSigma=None): """ Initialize the scale for a given image size and a number of scales. Parameters @@ -187,37 +247,67 @@ def init_starlet(self, nside, nscale=0, lmax=0, ALM_iter=0): """ self.nside = np.int64(nside) self.nx = 12 * self.nside * self.nside - if nscale == 0: - nscale = np.int64(np.log(nx) // 1) - self.ns = np.int64(nscale) if lmax != 0: self.lmax = np.int64(lmax) else: self.lmax = 3 * self.nside + + if TabResolSigma is not None: + nscale = TabResolSigma.shape[0] + 1 + self.TabSigma = np.zeros(nscale) + self.PixelResol = amin_pixel_size(self.nside) + self.TabSigma [0] = splinelmax2sigma(self.lmax) + if TabResolSigma.min() < self.TabSigma [0]: + raise ValueError(f"Smoothing Gaussian standard deviation must be larger than {self.TabSigma [0]:.2f}: {TabResolSigma.min():.2f} is not valid.") + self.TabSigma [1:] =TabResolSigma + assert_strictly_increasing(self.TabSigma) + self.Tablmax = sigma2splinelmax(self.TabSigma, False) + else: + if nscale == 0: + nscale = np.int64(np.log(self.nside) // 1) + 1 + self.ns = np.int64(nscale) + if ALM_iter != 0: self.ALM_iter = np.int64(ALM_iter) if MRS_CXX and self.TypeCode == 1: CMRS = pymrs.MRS() CMRS.alloc(nside, self.ns, self.lmax, self.ALM_iter, self.verb) - - Ne = len(MRS_StarletTabNorm) - self.TabNorm = np.zeros(self.ns, dtype=np.float64) - if Ne >= self.ns: - self.TabNorm = MRS_StarletTabNorm[: self.ns] + + if TabResolSigma is None: + self.Tablmax, self.TabSigma, self.TabPhi, self.TabPsi, self.Tabh, self.Tabg = get_default_filters(self.nside, self.ns) + # print("Default TabSigma = ", self.TabSigma) else: - self.TabNorm[:Ne] = MRS_StarletTabNorm - val = MRS_StarletTabNorm[-1] - for i in range(Ne, self.ns): - val /= 2 - self.TabNorm[i] = val - # print("TabNorm = ", self.TabNorm) - + self.TabPhi, self.TabPsi, self.Tabh, self.Tabg = get_sigmafilters(self.TabSigma, self.lmax, Phi0Spline=False) + + self.TabResol = np.zeros(self.ns) + self.TabResol[0] = self.TabSigma[0] + for j in range(1,self.ns-1): + self.TabResol[j] = self.TabSigma[j-1] + (self.TabSigma[j] - self.TabSigma[j-1]) / 2. + self.TabResol[self.ns-1] = self.TabSigma[self.ns-1] + + # print("TB ", self.TabResol) + # Estimate the normalization factor for the coefficients. + self.TabNorm = np.zeros(self.ns, dtype=np.float64) + self.TabNorm[0] = 1. + PixelWindow=False + Phi0Lmax = sigma_filter(self.TabPhi[:,0], self.nside, lmax=self.lmax, PixelWindow=PixelWindow) + DeltaPhi0 = np.sqrt(1. - Phi0Lmax**2) + # the alm m around lmax are often not accurate, and 1 if a conservative value, the + # the correct value is most likely between 0.9 and 1. + self.TabNorm[0] = np.sqrt( DeltaPhi0**2 + (sigma_filter(self.TabPsi[:,0], self.nside, lmax=self.lmax, PixelWindow=PixelWindow))**2) + for j in range(1,self.ns): + self.TabNorm[j] = sigma_filter(self.TabPsi[:,j], self.nside, lmax=self.lmax, PixelWindow=PixelWindow) + def info(self): # sound is a method (a method is a function of an object) """ Print information relative to the intialisation. """ print(self.name, ": Npix = ", self.nx, ", Ns = ", self.ns) + for j in range(self.ns-1): + print("%s Band %2d, Scale %5.2f amin " + % (self.name, j + 1, self.TabResol[j]) + ) # print("Coef TabSize = ", np.shape(self.coef)) def stat(self): @@ -236,10 +326,21 @@ def stat(self): for j in range(self.ns): s = (self.coef)[j] print( - "%s Scale %2d: Min = %f, Max = %f, Mean = %f, std = %f" - % (self.name, j + 1, s.min(), s.max(), s.mean(), s.std()) + "%s Band %2d, Scale %5.2f amin: Min = %f, Max = %f, Mean = %f, std = %f" + % (self.name, j + 1, self.TabResol[j], s.min(), s.max(), s.mean(), s.std()) ) + def plot_filter(self, wavelet=True, scaling=False, hfilter=False, gfilter=False): + + if wavelet: + plot_filter(self.TabPsi, title="Spherical wavelet Filters", xlabel="Multipole l", ylabel="Amplitude", legend_prefix="Scale") + if scaling: + plot_filter(self.TabPhi, title="Spherical Scaling Filters", xlabel="Multipole l", ylabel="Amplitude", legend_prefix="Scale") + if hfilter: + plot_filter(self.Tabh, title="Spherical H-Filters", xlabel="Multipole l", ylabel="Amplitude", legend_prefix="Scale") + if gfilter: + plot_filter(self.Tabg, title="Spherical G-Filters", xlabel="Multipole l", ylabel="Amplitude", legend_prefix="Scale") + def transform(self, data, WTname=None, opt=None): """ Apply the starlet transform to image. Coeffients are stored in @@ -273,19 +374,16 @@ def transform(self, data, WTname=None, opt=None): CMRS.alloc(self.nside, self.ns, self.nside * 3, self.ALM_iter, True) self.coef = CMRS.uwt(im, self.ns) elif self.TypeCode == 2: - self.coef = mrs_uwttrans( - im, self.ns, self.lmax, opt=opt, verbose=self.verb, path="./", cxx=True - ) + self.coef = mrs_uwttrans(im, self.ns, self.lmax, opt=opt, verbose=self.verb, path="./", cxx=True) else: - self.coef = mrs_uwttrans( - im, - self.ns, - self.lmax, - opt=None, - verbose=self.verb, - path="./", - cxx=False, - ) + # print(self.ns, self.TabPhi.shape) + self.coef = wt_phi_filter_trans(im, self.TabPhi) + # self.coef = mrs_uwttrans(im,self.ns,self.lmax,opt=None,verbose=self.verb,path="./",cxx=False) + + if self.l2norm is True: + for j in range(self.ns): + self.coef[j, :] = self.coef[j, :] / self.TabNorm[j] + def recons(self): """ @@ -298,6 +396,10 @@ def recons(self): rec : 1D np.ndarray Reconstructed image. """ + + if self.l2norm is True: + for j in range(self.ns): + self.coef[j, :] = self.coef[j, :] * self.TabNorm[j] return np.sum(self.coef, axis=0) def denoising(self, data, SigmaNoise=0, Nsigma=3, ThresCoarse=False, hard=True): @@ -582,7 +684,7 @@ def eval_computation_time(self): start = time.time() w = self.transform(d) end = time.time() - print(f"==> Execution time : {end - start:.4f} seconds") + print(f"==> Execution time full python code : {end - start:.4f} seconds") self.TypeCode = 1 print("Use ", self.TabNameCode[self.TypeCode], " code:") @@ -590,14 +692,14 @@ def eval_computation_time(self): start = time.time() w = self.transform(d) end = time.time() - print(f"==> Execution time : {end - start:.4f} seconds") + print(f"==> Execution time C++ binding : {end - start:.4f} seconds") self.TypeCode = 2 print("Use ", self.TabNameCode[self.TypeCode], " code:") start = time.time() w = self.transform(d) end = time.time() - print(f"==> Execution time : {end - start:.4f} seconds") + print(f"==> Execution time C++ binary: {end - start:.4f} seconds") ################################ END CLASS ###################### @@ -691,8 +793,12 @@ def mrs_uwtrecons(Tmap, lmax=None, opt=None, verbose=False, path="./", progpath= # Wavelet filtering - -def spline2(size, l, lc): +# Define the spline function in l-space +def spline(l): + return (1/12) * (np.abs(l - 2)**3 - 4*np.abs(l - 1)**3 + + 6*np.abs(l)**3 - 4*np.abs(l + 1)**3 + + np.abs(l + 2)**3) +def spline2(size, l=1, lc=1): """ Compute a non-negative decreasing spline, with value 1 at index 0. @@ -701,7 +807,8 @@ def spline2(size, l, lc): size: int size of the spline l: float - spline parameter + spline parameter. If l=1, the function goes to zero at l = size + If l=2, the function goes to zero at l = size / 2 lc: float spline parameter @@ -710,21 +817,10 @@ def spline2(size, l, lc): np.ndarray (size,) float array, spline """ - res = np.arange(0, size + 1) res = 2 * l * res / (lc * size) - res = ( - (3 / 2) - * 1 - / 12 - * ( - abs(res - 2) ** 3 - - 4 * abs(res - 1) ** 3 - + 6 * abs(res) ** 3 - - 4 * abs(res + 1) ** 3 - + abs(res + 2) ** 3 - ) - ) + res = ((3 / 2) * 1 / 12 * ( abs(res - 2)** 3 - 4*abs(res - 1)**3 + + 6*abs(res)**3 - 4*abs(res + 1)**3 + abs(res + 2)**3)) return res @@ -776,6 +872,301 @@ def compute_g(size, lc): return g +def plot_filter(T, x=None, title="Spherical wavelet Filters", xlabel="l", ylabel="Amplitude", legend_prefix="Scale"): + """ + Plot a series of curves from a 2D array T. + + Parameters: + - T: 2D numpy array of shape (n_points, n_curves) + - x: Optional 1D array of length n_points for the x-axis values + - title: Title of the plot + - xlabel: Label for the x-axis + - ylabel: Label for the y-axis + - legend_prefix: Prefix for the legend entries + """ + T = np.asarray(T) + + if x is None: + x = np.arange(T.shape[0]) + + if len(x) != T.shape[0]: + raise ValueError("Length of x must match number of rows in T.") + + plt.figure(figsize=(10, 6)) + + for i in range(T.shape[1]): + z = T[:, i] + plt.plot(x, z / z.max(), label=f"{legend_prefix} {i+1}") + + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.legend() + plt.grid(True) + plt.tight_layout() + plt.show() + +def plotsig(T, x=None, title="Spherical wavelet Filters", xlabel="X", ylabel="T[:, i]", legend_prefix="Scale"): + """ + Plot a series of curves from a 2D array T. + + Parameters: + - T: 2D numpy array of shape (n_points, n_curves) + - x: Optional 1D array of length n_points for the x-axis values + - title: Title of the plot + - xlabel: Label for the x-axis + - ylabel: Label for the y-axis + - legend_prefix: Prefix for the legend entries + """ + T = np.asarray(T) + + if x is None: + x = np.arange(T.shape[0]) + + if len(x) != T.shape[0]: + raise ValueError("Length of x must match number of rows in T.") + + plt.figure(figsize=(10, 6)) + + plt.plot(x, T / T.max(), label=f"{legend_prefix}") + + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.legend() + plt.grid(True) + plt.tight_layout() + plt.show() + + +def ploth(lmax, x=None, title="h filter", xlabel="l", ylabel="T[:, i]", legend_prefix="H filter"): + """ + Plot a series of curves from a 2D array T. + + Parameters: + - T: 2D numpy array of shape (n_points, n_curves) + - x: Optional 1D array of length n_points for the x-axis values + - title: Title of the plot + - xlabel: Label for the x-axis + - ylabel: Label for the y-axis + - legend_prefix: Prefix for the legend entries + """ + + + x = np.arange(0, lmax+1) + size=1 + + # tab2 = spline2(size, lc, 1) + h = compute_h(lmax, 1) + a = splinelmax2sigma(lmax, True) * 60. / 180. * np.pi + g = gaussian_l(x, a / 60. / 180. * np.pi, 1) + + plt.figure(figsize=(10, 6)) + plt.plot(x, h, label="{legend_prefix}") + plt.plot(x, g, '--', label=f"Est Gaussian fit\nσ °", lw=2) + + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.legend() + plt.grid(True) + plt.tight_layout() + plt.show() + + +def amin_pixel_size(nside): + res_rad = hp.nside2resol(nside) # en radians + res_deg = np.degrees(res_rad) # en degrés + res_arcmin = res_deg * 60 # en arcminutes + return res_arcmin + + + +# Gaussian beam in l-space +def gaussian_l(l, sigma, A): + return A * np.exp(-0.5 * l * (l + 1) * sigma**2) + + +def splinelmax2sigma(lmax, hfilter=False): + sig3000 = 3.81594630 # (amin)) : gaussian fit done at lmax = 3000 + sig = 3000. / lmax * sig3000 + if hfilter: + sig = sig * 1.771 + return sig + +def sigma2splinelmax(sigma, hfilter): + sig3000 = 3.81594630 # (amin)) : gaussian fit done at lmax = 3000 + lmax = 3000. / sigma * sig3000 + if hfilter: + lmax = lmax * 1.771 + return lmax + +# TabSigma=np.array([1.,2,3,4,8,12,20,30]) +def safe_divide(a,b , eps=1e-8): + b_safe = np.where(np.abs(b) > eps, b, np.inf) # Replace near-zero with ∞ to avoid divide-by-zero + result = a / b_safe + result[np.abs(b) <= eps] = 0 # Explicitly set output to 0 where b was near-zero + return result + +def get_sigmafilters(TabSigma, lmax, Phi0Spline=False): + nscales = TabSigma.shape[0] + TabPhi = np.zeros((lmax+1, nscales)) + TabPsi = np.zeros((lmax+1, nscales)) + Tabh = np.zeros((lmax+1, nscales)) + Tabg = np.zeros((lmax+1, nscales)) + 1. + if Phi0Spline: + TabPhi[:,0]= spline2(lmax, 1) + else: + TabPhi[:,0]= 1 + for j in range(1,nscales): + lm = sigma2splinelmax( TabSigma[j], False) + TabPhi[:,j] = spline2(lmax, lmax/lm) + TabPsi[:,j-1] = TabPhi[:,j-1] - TabPhi[:,j] + # print("Scale ", j+1, " Sigma = ", TabSigma[j], ", lm = ", lm) + Tabh [:,j-1] = safe_divide(TabPhi[:,j], TabPhi[:,j-1]) + Tabg [:,j-1] = 1. - Tabh [:,j-1] + TabPsi[:,nscales-1] = TabPhi[:,nscales-1] + Tabh[:,nscales-1] = Tabh [:,nscales-2] + Tabg[:,nscales-1] = Tabh [:,nscales-2] + return TabPhi, TabPsi, Tabh, Tabg + +def get_lmaxfilters(Tablmax): + nscales = Tablmax.shape[0] + lmax = Tablmax[0] + nl = Tablmax[0]+1 + TabPhi = np.zeros((nl, nscales)) + TabPsi = np.zeros((nl, nscales)) + Tabh = np.zeros((nl, nscales)) + Tabg = np.zeros((nl, nscales)) + 1. + TabPhi[:,0]= sigma2splinelmax( TabSigma[0], False) + for j in range(1,nscales): + lm = sigma2splinelmax( TabSigma[j-1], False) + # print("Scale ", j+1, " Sigma = ", TabSigma[j-1], ", lm = ", lm) + TabPhi[:,j] = spline2(lmax, lmax/lm) + TabPsi[:,j-1] = TabPhi[:,j-1] - TabPhi[:,j] + Tabh [:,j-1] = TabPhi[:,j] / (TabPhi[:,j-1] + 1e-6) + Tabg [:,j-1] = 1. - Tabh [:,j-1] + TabPsi[:,nscales-1] = TabPhi[:,nscales-1] + return TabPhi, TabPsi, Tabh, Tabg + + +def get_default_filters(nside, nscales, lmax=None): + if lmax is None: + lmax = 3 * nside + Tablmax = np.zeros((nscales)) + lm = lmax + for j in range(0,nscales): + Tablmax[j] = lm + lm = lm / 2 + TabSigma = splinelmax2sigma(Tablmax) + tl = sigma2splinelmax( TabSigma[0], True) + TabPhi, TabPsi, Tabh, Tabg = get_sigmafilters(TabSigma, lmax, True) + return Tablmax, TabSigma, TabPhi, TabPsi, Tabh, Tabg + +def wt_hfilter_trans(Map, Tabh): + nside = hp.get_nside(Map) + npix = Map.shape[0] + lmax = Tabh.shape[0] - 1 + nscales = Tabh.shape[1] + wts = np.zeros((nscales,npix)) + h_scale = np.copy(Map) + alms = map2alm(h_scale, lmax=lmax) + AlmsHF = np.copy(alms) + for j in range(nscales-1): + h = Tabh[:,j] + AlmLF = alm_product(AlmsHF, h) + # AlmLF = alm_product(alms, h) # there a bug in the c++ code. Uncommenting this line reproduces the bug. + l_scale = alm2map(AlmLF, nside) + coef = h_scale - l_scale + h_scale[:] = l_scale[:] + AlmsHF[:] = AlmLF[:] + wts[j,:] = coef + wts[nscales-1,:] = l_scale + return wts + +def wt_phi_filter_trans(Map, TabFilter): + nside = hp.get_nside(Map) + npix = Map.shape[0] + lmax = TabFilter.shape[0] - 1 + nscales = TabFilter.shape[1] + wts = np.zeros((nscales,npix)) + # print("WTS: ", nscales) + h_scale = np.copy(Map) + alms = map2alm(h_scale, lmax=lmax) + for j in range(nscales-1): + h = TabFilter[:,j+1] + AlmLF = alm_product(alms, h) + l_scale = alm2map(AlmLF, nside) + wts[j,:] = h_scale - l_scale + h_scale[:] = l_scale[:] + wts[nscales-1,:] = l_scale + return wts + +def convol_filter(Map, Filter): + nside = hp.get_nside(Map) + npix = Map.shape[0] + lmax = Filter.shape[0] - 1 + alms = map2alm(Map, lmax=lmax) + AlmCoef = alm_product(alms, Filter) + return alm2map(AlmCoef, nside) + +def test_wt_hfilter_trans(): + nside=1024 + nscales=5 + d = np.random.normal(size=(nside**2 * 12)) + Tablmax, TabSigma, TabPhi, TabPsi, Tabh, Tabg = get_default_filters(nside, nscales) + + w = wt_hfilter_trans(d, Tabh) + w1 = wt_trans(d, nscales=4) + return wts + + + +def get_sigma_from_spline(lmax, hfilter=False): + # Range of l values (spherical harmonics degrees) + l_vals = np.arange(0, lmax+1) + # res = 2 * l * l_vals / lmax + # s_vals = spline(res) + if hfilter is False: + s_vals = spline2(lmax, 1, 1) + else: + s_vals = compute_h(lmax, 1) + + # Fit Gaussian to spline + popt, _ = curve_fit(gaussian_l, l_vals, s_vals, p0=(0.01, 1.0)) + sigma_est, A_est = popt + + # Print result + print(f"Estimated angular σ ≈ {np.degrees(sigma_est):.8f} deg (≈ {sigma_est:.5f} rad)") + print(f"Estimated amplitude A ≈ {A_est:.4f}") + print(f"Estimated angular σ ≈ {np.degrees(sigma_est)*60.:.8f} amin (≈ {sigma_est:.5f} rad)") + + a = splinelmax2sigma(lmax, hfilter) + print(f"Linear Estimated angular σ ≈ {a:.4f} amin") + + sigma_rad = sigma_est + sigma_amin = np.degrees(sigma_rad)*60. + + # Compute FWHM + fwhm_rad = np.sqrt(8 * np.log(2)) * sigma_rad + fwhm_amin = np.degrees(fwhm_rad) * 60. + + print(f"Sigma (rad): {sigma_rad:.6f}") + print(f"FWHM (rad): {fwhm_rad:.6f}") + print(f"FWHM (amin): {fwhm_amin:.3f}") + + # Plot + plt.plot(l_vals, s_vals, label="Spline($\ell$)", lw=2) + plt.plot(l_vals, gaussian_l(l_vals, *popt), '--', label=f"Gaussian fit\nσ ≈ {np.degrees(sigma_est):.2f}°", lw=2) + plt.plot(l_vals, gaussian_l(l_vals, a / 60. / 180. * np.pi, 1), '--', label=f"Est Gaussian fit\nσ ≈ {np.degrees(sigma_est):.2f}°", lw=2) + plt.xlabel("$\ell$") + plt.ylabel("Amplitude") + plt.legend() + plt.grid(True) + plt.title("Spline($\ell$) vs Gaussian Beam Fit") + plt.show() + + def get_wt_filters(lmax, nscales): """Compute wavelet filters. @@ -865,15 +1256,24 @@ def wt_trans(inputs, nscales=3, lmax=None, alm_in=False, nside=None, alm_out=Fal npix = np.shape(alms)[1] wts = np.zeros((np.shape(maps)[0], npix, nscales + 1), dtype="complex") + t = get_wt_filters(lmax, nscales) + t1 = np.copy(t) scale = 1 + alm_high = np.copy(alms) for j in range(nscales): + # print("scale ", j+1) h = compute_h(lmax, scale) + t1[:,j] = h + d = h - t[:,j+1] + info(d) if not alm_out: - m = alm2map(alm_product(alms, h), nside) + alm_low = alm_product(alm_high, h) + m = alm2map(alm_low, nside) else: - m = alm_product(alms, h) + m = alm_product(alm_high, h) h_scale = l_scale - m l_scale = m + alm_high[:] = alm_low[:] if dim_inputs == 1: wts[:, j] = h_scale else: @@ -972,6 +1372,6 @@ def f(x): if __name__ == "__main__": print("Main :)") - testclass = 1 + testclass = 0 if testclass: C = test_mrs_class()