diff --git a/framework/python/src/common/mqtt.py b/framework/python/src/common/mqtt.py index e63af52ce..32cb421eb 100644 --- a/framework/python/src/common/mqtt.py +++ b/framework/python/src/common/mqtt.py @@ -13,6 +13,7 @@ # limitations under the License. """MQTT client""" +import json import typing as t import paho.mqtt.client as mqtt_client from common import logger @@ -55,4 +56,6 @@ def send_message(self, topic: str, message: t.Union[str, dict]) -> None: message (t.Union[str, dict]): message """ self._connect() + if isinstance(message, dict): + message = json.dumps(message) self._client.publish(topic, str(message)) diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index 795506083..08e5dda95 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -723,6 +723,9 @@ def detect_network_adapters_change(self) -> dict: if 'items_removed' in diff: adapters['adapters_removed'] = diff['items_removed'] # Save new network interfaces to session - LOGGER.debug(f'Network adapters changed {adapters}') + LOGGER.debug(f'Network adapters change detected: {adapters}') self._ifaces = ifaces_new return adapters + + def get_ifaces(self): + return self._ifaces diff --git a/framework/python/src/common/tasks.py b/framework/python/src/common/tasks.py index 8fb6ca24b..c71742ced 100644 --- a/framework/python/src/common/tasks.py +++ b/framework/python/src/common/tasks.py @@ -15,7 +15,7 @@ from contextlib import asynccontextmanager import datetime - +import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI @@ -24,6 +24,7 @@ # Check adapters period seconds CHECK_NETWORK_ADAPTERS_PERIOD = 5 +INTERNET_CONNECTION_TOPIC = 'events/internet' NETWORK_ADAPTERS_TOPIC = 'events/adapter' LOGGER = logger.get_logger('tasks') @@ -39,6 +40,8 @@ def __init__( self._mqtt_client = self._testrun.get_mqtt_client() local_tz = datetime.datetime.now().astimezone().tzinfo self._scheduler = AsyncIOScheduler(timezone=local_tz) + # Prevent scheduler warnings + self._scheduler._logger.setLevel(logging.ERROR) @asynccontextmanager async def start(self, app: FastAPI): # pylint: disable=unused-argument @@ -47,15 +50,24 @@ async def start(self, app: FastAPI): # pylint: disable=unused-argument Args: app (FastAPI): app instance """ - # job that checks for changes in network adapters + # Job that checks for changes in network adapters self._scheduler.add_job( func=self._testrun.get_net_orc().network_adapters_checker, kwargs={ - 'mgtt_client': self._mqtt_client, + 'mqtt_client': self._mqtt_client, 'topic': NETWORK_ADAPTERS_TOPIC }, trigger='interval', seconds=CHECK_NETWORK_ADAPTERS_PERIOD, ) + self._scheduler.add_job( + func=self._testrun.get_net_orc().internet_conn_checker, + kwargs={ + 'mqtt_client': self._mqtt_client, + 'topic': INTERNET_CONNECTION_TOPIC + }, + trigger='interval', + seconds=CHECK_NETWORK_ADAPTERS_PERIOD, + ) self._scheduler.start() yield diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index c8138fb99..6f6baf642 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -114,7 +114,7 @@ def __init__(self, # Start websockets server self.start_ws() - # MQTT client + # Init MQTT client self._mqtt_client = mqtt.MQTT() if self._no_ui: diff --git a/framework/python/src/net_orc/ip_control.py b/framework/python/src/net_orc/ip_control.py index 890277963..abefe6b09 100644 --- a/framework/python/src/net_orc/ip_control.py +++ b/framework/python/src/net_orc/ip_control.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """IP Control Module""" - import psutil import typing as t from common import logger @@ -102,7 +101,7 @@ def get_iface_port_stats(self, iface): def get_namespaces(self): result = util.run_command('ip netns list') - #Strip ID's from the namespace results + # Strip ID's from the namespace results namespaces = re.findall(r'(\S+)(?:\s+\(id: \d+\))?', result[0]) return namespaces @@ -241,6 +240,14 @@ def configure_container_interface(self, return False return True + def ping_via_gateway(self, host): + """Ping the host trough the gateway container""" + command = f'docker exec tr-ct-gateway ping -W 1 -c 1 {host}' + output = util.run_command(command) + if '0% packet loss' in output[0]: + return True + return False + @staticmethod def get_sys_interfaces() -> t.Dict[str, t.Dict[str, str]]: """ Retrieves all Ethernet network interfaces from the host system diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index 27af4e1c7..b5bc995bf 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -24,7 +24,7 @@ import time import traceback from docker.types import Mount -from common import logger, util +from common import logger, util, mqtt from net_orc.listener import Listener from net_orc.network_event import NetworkEvent from net_orc.network_validator import NetworkValidator @@ -550,10 +550,6 @@ def _start_network_service(self, net_module): cap_add=['NET_ADMIN'], name=net_module.container_name, hostname=net_module.container_name, - # Undetermined version of docker seems to have broken - # DNS configuration (/etc/resolv.conf) Re-add when/if - # this network is utilized and DNS issue is resolved - #network=PRIVATE_DOCKER_NET, network_mode='none', privileged=True, detach=True, @@ -789,17 +785,46 @@ def restore_net(self): def get_session(self): return self._session - def network_adapters_checker(self, mgtt_client, topic): + def network_adapters_checker(self, mqtt_client: mqtt.MQTT, topic: str): """Checks for changes in network adapters and sends a message to the frontend """ try: adapters = self._session.detect_network_adapters_change() if adapters: - mgtt_client.send_message(topic, adapters) + mqtt_client.send_message(topic, adapters) except Exception: LOGGER.error(traceback.format_exc()) + def internet_conn_checker(self, mqtt_client: mqtt.MQTT, topic: str): + """Checks internet connection and sends a status to frontend""" + + # Default message + message = {'connection': False} + + # Only check if Testrun is running + if self.get_session().get_status() not in [ + 'Waiting for Device', 'Monitoring', 'In Progress' + ]: + message['connection'] = None + + # Only run if single intf mode not used + elif 'single_intf' not in self._session.get_runtime_params(): + iface = self._session.get_internet_interface() + + # Check that an internet intf has been selected + if iface and iface in self._session.get_ifaces(): + + # Ping google.com from gateway container + internet_connection = self._ip_ctrl.ping_via_gateway( + 'google.com') + + if internet_connection: + message['connection'] = True + + # Broadcast via MQTT client + mqtt_client.send_message(topic, message) + class NetworkModule: """Define all the properties of a Network Module""" diff --git a/modules/ui/src/app/app.component.html b/modules/ui/src/app/app.component.html index 122e8b129..b1341a58d 100644 --- a/modules/ui/src/app/app.component.html +++ b/modules/ui/src/app/app.component.html @@ -116,6 +116,14 @@

Testrun

"> tune + + + diff --git a/modules/ui/src/app/app.component.scss b/modules/ui/src/app/app.component.scss index 20e81c53e..9639f5cc0 100644 --- a/modules/ui/src/app/app.component.scss +++ b/modules/ui/src/app/app.component.scss @@ -208,3 +208,9 @@ app-version { display: flex; justify-content: center; } + +.separator { + width: 1px; + height: 28px; + background-color: $light-grey; +} diff --git a/modules/ui/src/app/app.component.spec.ts b/modules/ui/src/app/app.component.spec.ts index b626123d7..e54eece26 100644 --- a/modules/ui/src/app/app.component.spec.ts +++ b/modules/ui/src/app/app.component.spec.ts @@ -71,6 +71,7 @@ import { LiveAnnouncer } from '@angular/cdk/a11y'; import { HISTORY } from './mocks/reports.mock'; import { TestRunMqttService } from './services/test-run-mqtt.service'; import { MOCK_ADAPTERS } from './mocks/settings.mock'; +import { WifiComponent } from './components/wifi/wifi.component'; import { MatTooltipModule } from '@angular/material/tooltip'; const windowMock = { @@ -123,7 +124,10 @@ describe('AppComponent', () => { 'focusFirstElementInContainer', ]); mockLiveAnnouncer = jasmine.createSpyObj('mockLiveAnnouncer', ['announce']); - mockMqttService = jasmine.createSpyObj(['getNetworkAdapters']); + mockMqttService = jasmine.createSpyObj([ + 'getNetworkAdapters', + 'getInternetConnection', + ]); TestBed.configureTestingModule({ imports: [ @@ -139,6 +143,7 @@ describe('AppComponent', () => { CalloutComponent, MatIconTestingModule, CertificatesComponent, + WifiComponent, MatTooltipModule, ], providers: [ @@ -441,6 +446,13 @@ describe('AppComponent', () => { expect(version).toBeTruthy(); }); + it('should internet icon', () => { + fixture.detectChanges(); + const internet = compiled.querySelector('app-wifi'); + + expect(internet).toBeTruthy(); + }); + describe('Callout component visibility', () => { describe('with no connection settings', () => { beforeEach(() => { diff --git a/modules/ui/src/app/app.component.ts b/modules/ui/src/app/app.component.ts index 2214b8927..e80f04ad0 100644 --- a/modules/ui/src/app/app.component.ts +++ b/modules/ui/src/app/app.component.ts @@ -83,6 +83,7 @@ export class AppComponent { this.appStore.getReports(); this.appStore.getTestModules(); this.appStore.getNetworkAdapters(); + this.appStore.getInternetConnection(); this.matIconRegistry.addSvgIcon( 'devices', this.domSanitizer.bypassSecurityTrustResourceUrl(DEVICES_LOGO_URL) diff --git a/modules/ui/src/app/app.module.ts b/modules/ui/src/app/app.module.ts index 92a5f7b34..795d4e0d8 100644 --- a/modules/ui/src/app/app.module.ts +++ b/modules/ui/src/app/app.module.ts @@ -49,6 +49,7 @@ import { ShutdownAppComponent } from './components/shutdown-app/shutdown-app.com import { WindowProvider } from './providers/window.provider'; import { CertificatesComponent } from './pages/certificates/certificates.component'; import { LOADER_TIMEOUT_CONFIG_TOKEN } from './services/loaderConfig'; +import { WifiComponent } from './components/wifi/wifi.component'; import { MqttModule, IMqttServiceOptions } from 'ngx-mqtt'; @@ -87,6 +88,7 @@ export const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = { ShutdownAppComponent, CertificatesComponent, MqttModule.forRoot(MQTT_SERVICE_OPTIONS), + WifiComponent, ], providers: [ WindowProvider, diff --git a/modules/ui/src/app/app.store.spec.ts b/modules/ui/src/app/app.store.spec.ts index 0130e9b40..7d114e8be 100644 --- a/modules/ui/src/app/app.store.spec.ts +++ b/modules/ui/src/app/app.store.spec.ts @@ -85,7 +85,10 @@ describe('AppStore', () => { mockFocusManagerService = jasmine.createSpyObj([ 'focusFirstElementInContainer', ]); - mockMqttService = jasmine.createSpyObj(['getNetworkAdapters']); + mockMqttService = jasmine.createSpyObj([ + 'getNetworkAdapters', + 'getInternetConnection', + ]); TestBed.configureTestingModule({ providers: [ @@ -162,6 +165,7 @@ describe('AppStore', () => { isMenuOpen: true, interfaces: {}, settingMissedError: null, + hasInternetConnection: null, }); done(); }); @@ -303,5 +307,19 @@ describe('AppStore', () => { ); }); }); + + describe('getInternetConnection', () => { + it('should update store', done => { + mockMqttService.getInternetConnection.and.returnValue( + of({ connection: false }) + ); + appStore.getInternetConnection(); + + appStore.viewModel$.pipe(take(1)).subscribe(store => { + expect(store.hasInternetConnection).toEqual(false); + done(); + }); + }); + }); }); }); diff --git a/modules/ui/src/app/app.store.ts b/modules/ui/src/app/app.store.ts index 562d2618e..3584ec8d1 100644 --- a/modules/ui/src/app/app.store.ts +++ b/modules/ui/src/app/app.store.ts @@ -16,7 +16,7 @@ import { Injectable } from '@angular/core'; import { ComponentStore } from '@ngrx/component-store'; -import { tap } from 'rxjs/operators'; +import { tap, withLatestFrom } from 'rxjs/operators'; import { selectError, selectHasConnectionSettings, @@ -55,12 +55,16 @@ export const CONSENT_SHOWN_KEY = 'CONSENT_SHOWN'; export interface AppComponentState { consentShown: boolean; isStatusLoaded: boolean; + hasInternetConnection: boolean | null; systemStatus: TestrunStatus | null; } @Injectable() export class AppStore extends ComponentStore { private consentShown$ = this.select(state => state.consentShown); private isStatusLoaded$ = this.select(state => state.isStatusLoaded); + private hasInternetConnection$ = this.select( + state => state.hasInternetConnection + ); private hasDevices$ = this.store.select(selectHasDevices); private hasRiskProfiles$ = this.store.select(selectHasRiskProfiles); private reports$ = this.store.select(selectReports); @@ -85,6 +89,7 @@ export class AppStore extends ComponentStore { isMenuOpen: this.isMenuOpen$, interfaces: this.interfaces$, settingMissedError: this.settingMissedError$, + hasInternetConnection: this.hasInternetConnection$, }); updateConsent = this.updater((state, consentShown: boolean) => ({ @@ -97,6 +102,13 @@ export class AppStore extends ComponentStore { isStatusLoaded, })); + updateHasInternetConnection = this.updater( + (state, hasInternetConnection: boolean | null) => ({ + ...state, + hasInternetConnection, + }) + ); + setContent = this.effect(trigger$ => { return trigger$.pipe( tap(() => { @@ -158,6 +170,19 @@ export class AppStore extends ComponentStore { ); }); + getInternetConnection = this.effect(trigger$ => { + return trigger$.pipe( + exhaustMap(() => { + return this.testRunMqttService.getInternetConnection().pipe( + withLatestFrom(this.hasInternetConnection$), + tap(([{ connection }]) => { + this.updateHasInternetConnection(connection); + }) + ); + }) + ); + }); + private notifyAboutTheAdapters(adapters: SystemInterfaces) { this.notificationService.notify( `New network adapter(s) ${Object.keys(adapters).join(', ')} has been detected. You can switch to using it in the System settings menu` @@ -225,6 +250,7 @@ export class AppStore extends ComponentStore { consentShown: sessionStorage.getItem(CONSENT_SHOWN_KEY) !== null, isStatusLoaded: false, systemStatus: null, + hasInternetConnection: null, }); } } diff --git a/modules/ui/src/app/components/wifi/wifi.component.html b/modules/ui/src/app/components/wifi/wifi.component.html new file mode 100644 index 000000000..c93d05f7e --- /dev/null +++ b/modules/ui/src/app/components/wifi/wifi.component.html @@ -0,0 +1,25 @@ + + diff --git a/modules/ui/src/app/components/wifi/wifi.component.scss b/modules/ui/src/app/components/wifi/wifi.component.scss new file mode 100644 index 000000000..bc0ac542e --- /dev/null +++ b/modules/ui/src/app/components/wifi/wifi.component.scss @@ -0,0 +1,40 @@ +/** + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@import '../../../theming/colors'; + +$icon-size: 24px; + +.app-toolbar-button { + border-radius: 20px; + border: 1px solid transparent; + min-width: 48px; + padding: 0; + box-sizing: border-box; + height: 34px; + margin: 11px 0; + line-height: 50% !important; + &.disabled { + opacity: 0.6; + } +} + +.wifi-icon { + margin-right: 0; + width: $icon-size; + font-size: $icon-size; + color: $dark-grey; + height: $icon-size; +} diff --git a/modules/ui/src/app/components/wifi/wifi.component.spec.ts b/modules/ui/src/app/components/wifi/wifi.component.spec.ts new file mode 100644 index 000000000..55e85a6a7 --- /dev/null +++ b/modules/ui/src/app/components/wifi/wifi.component.spec.ts @@ -0,0 +1,100 @@ +/** + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { ComponentFixture, TestBed } from '@angular/core/testing'; + +import { WifiComponent } from './wifi.component'; + +describe('WifiComponent', () => { + let component: WifiComponent; + let fixture: ComponentFixture; + let compiled: HTMLElement; + + beforeEach(async () => { + await TestBed.configureTestingModule({ + imports: [WifiComponent], + }).compileComponents(); + + fixture = TestBed.createComponent(WifiComponent); + component = fixture.componentInstance; + compiled = fixture.nativeElement as HTMLElement; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); + + describe('Class tests', () => { + describe('with internet connection', () => { + it('should return label', () => { + expect(component.getLabel(true)).toEqual( + 'Testrun detects a working internet connection for the device under test.' + ); + }); + }); + + describe('with no internet connection', () => { + it('should return label', () => { + expect(component.getLabel(false)).toEqual( + 'No internet connection detected for the device under test.' + ); + }); + }); + + describe('with N/A internet connection', () => { + it('should return label', () => { + expect(component.getLabel(false, true)).toEqual( + 'Internet connection is not being monitored.' + ); + }); + }); + }); + + describe('DOM tests', () => { + describe('with internet connection', () => { + it('should have wifi icon', () => { + component.on = true; + fixture.detectChanges(); + + const icon = compiled.querySelector('mat-icon')?.textContent?.trim(); + + expect(icon).toEqual('wifi'); + }); + }); + + describe('should have no wifi icon', () => { + it('should have no wifi icon', () => { + component.on = false; + fixture.detectChanges(); + + const icon = compiled.querySelector('mat-icon')?.textContent?.trim(); + + expect(icon).toEqual('wifi_off'); + }); + }); + + it('button should be disabled', () => { + component.disable = true; + fixture.detectChanges(); + + const shutdownButton = compiled.querySelector( + '.wifi-button' + ) as HTMLButtonElement; + + expect(shutdownButton?.classList.contains('disabled')).toBeTrue(); + }); + }); +}); diff --git a/modules/ui/src/app/components/wifi/wifi.component.ts b/modules/ui/src/app/components/wifi/wifi.component.ts new file mode 100644 index 000000000..e7e28e8f9 --- /dev/null +++ b/modules/ui/src/app/components/wifi/wifi.component.ts @@ -0,0 +1,40 @@ +/** + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { Component, Input } from '@angular/core'; +import { MatIcon } from '@angular/material/icon'; +import { MatTooltip } from '@angular/material/tooltip'; +import { MatButton, MatIconButton } from '@angular/material/button'; + +@Component({ + selector: 'app-wifi', + standalone: true, + imports: [MatIcon, MatTooltip, MatButton, MatIconButton], + templateUrl: './wifi.component.html', + styleUrl: './wifi.component.scss', +}) +export class WifiComponent { + @Input() on: boolean | null = null; + @Input() disable: boolean = false; + + getLabel(on: boolean | null, disable: boolean = false) { + if (disable) { + return 'Internet connection is not being monitored.'; + } + return on + ? 'Testrun detects a working internet connection for the device under test.' + : 'No internet connection detected for the device under test.'; + } +} diff --git a/modules/ui/src/app/mocks/topic.mock.ts b/modules/ui/src/app/mocks/topic.mock.ts new file mode 100644 index 000000000..4309ae84f --- /dev/null +++ b/modules/ui/src/app/mocks/topic.mock.ts @@ -0,0 +1,5 @@ +import { InternetConnection } from '../model/topic'; + +export const MOCK_INTERNET: InternetConnection = { + connection: false, +}; diff --git a/modules/ui/src/app/model/topic.ts b/modules/ui/src/app/model/topic.ts index 31cd8b1a7..955b65f59 100644 --- a/modules/ui/src/app/model/topic.ts +++ b/modules/ui/src/app/model/topic.ts @@ -1,3 +1,8 @@ export enum Topic { NetworkAdapters = 'events/adapter', + InternetConnection = 'events/internet', +} + +export interface InternetConnection { + connection: boolean | null; } diff --git a/modules/ui/src/app/services/test-run-mqtt.service.spec.ts b/modules/ui/src/app/services/test-run-mqtt.service.spec.ts index 637c441a4..f8fd07142 100644 --- a/modules/ui/src/app/services/test-run-mqtt.service.spec.ts +++ b/modules/ui/src/app/services/test-run-mqtt.service.spec.ts @@ -7,6 +7,7 @@ import SpyObj = jasmine.SpyObj; import { of } from 'rxjs'; import { MOCK_ADAPTERS } from '../mocks/settings.mock'; import { Topic } from '../model/topic'; +import { MOCK_INTERNET } from '../mocks/topic.mock'; describe('TestRunMqttService', () => { let service: TestRunMqttService; @@ -26,7 +27,7 @@ describe('TestRunMqttService', () => { expect(service).toBeTruthy(); }); - describe('', () => { + describe('getNetworkAdapters', () => { beforeEach(() => { mockService.observe.and.returnValue(of(getResponse(MOCK_ADAPTERS))); }); @@ -46,6 +47,28 @@ describe('TestRunMqttService', () => { }); }); + describe('getInternetConnection', () => { + beforeEach(() => { + mockService.observe.and.returnValue(of(getResponse(MOCK_INTERNET))); + }); + + it('should subscribe the topic', done => { + service.getInternetConnection().subscribe(() => { + expect(mockService.observe).toHaveBeenCalledWith( + Topic.InternetConnection + ); + done(); + }); + }); + + it('should return object of type', done => { + service.getInternetConnection().subscribe(res => { + expect(res).toEqual(MOCK_INTERNET); + done(); + }); + }); + }); + function getResponse(response: Type): IMqttMessage { const enc = new TextEncoder(); const message = enc.encode(JSON.stringify(response)); diff --git a/modules/ui/src/app/services/test-run-mqtt.service.ts b/modules/ui/src/app/services/test-run-mqtt.service.ts index c362f11e8..3483f77b6 100644 --- a/modules/ui/src/app/services/test-run-mqtt.service.ts +++ b/modules/ui/src/app/services/test-run-mqtt.service.ts @@ -3,7 +3,7 @@ import { IMqttMessage, MqttService } from 'ngx-mqtt'; import { catchError, Observable, of } from 'rxjs'; import { map } from 'rxjs/operators'; import { Adapters } from '../model/setting'; -import { Topic } from '../model/topic'; +import { InternetConnection, Topic } from '../model/topic'; @Injectable({ providedIn: 'root', @@ -15,6 +15,10 @@ export class TestRunMqttService { return this.topic(Topic.NetworkAdapters); } + getInternetConnection(): Observable { + return this.topic(Topic.InternetConnection); + } + private topic(topicName: string): Observable { return this.mqttService.observe(topicName).pipe( map(