Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
aos_api_client.egg-info
build
dist
*/.pytest_cache
*/.pytest_cache
*/__pycache__
3 changes: 3 additions & 0 deletions aos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
# LICENSE file at http://www.apstra.com/eula
import logging
import requests

from .aos import AosRestAPI, AosAuth
from .blueprint import AosBlueprint
from .devices import AosDevices
from .design import AosDesign
from .resources import AosResources
from .external_systems import AosExternalSystems
from .telemetry import AosTelemetryManager
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for both imports? line 8 and 15

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the one in line 8


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,3 +55,4 @@ def __init__(
self.design = AosDesign(self.rest)
self.resources = AosResources(self.rest)
self.external_systems = AosExternalSystems(self.rest)
self.telemetry_mgr = AosTelemetryManager(self.rest)
4 changes: 2 additions & 2 deletions aos/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def iter_all(self) -> Generator[SystemAgent, None, None]:
for s in system_agents.get("items", []):
yield SystemAgent.from_json(s)

def get_agent_by_id(self, system_id: str) -> Optional[System]:
return System.from_json(
def get_agent_by_id(self, system_id: str) -> Optional[SystemAgent]:
return SystemAgent.from_json(
self.rest.json_resp_get(f"/api/system-agents/{system_id}")
)

Expand Down
131 changes: 131 additions & 0 deletions aos/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2020-present, Apstra, Inc. All rights reserved.
#
# This source code is licensed under End User License Agreement found in the
# LICENSE file at http://www.apstra.com/eula
from dataclasses import dataclass
from typing import List

from requests import Response
from aos.aos import AosSubsystem


@dataclass
class AosTelemetryEndpointStatus:
"""
Represent the AosTelemetry Endpoint.
This is used to build out the Endpoint structure
"""

connected: bool
connection_log: List[dict]
connection_time: str
last_tx_time: str
epoch: str
connection_reset_count: int
dns_log: List[dict]
disconnection_time: str

@classmethod
def from_json(cls, d: dict) -> "AosTelemetryEndpointStatus":
return AosTelemetryEndpointStatus(
connected=d.get("connected"),
connection_log=d.get("connectionLog"),
connection_time=d.get("connectionTime"),
last_tx_time=d.get("lastTransmitedTime"),
epoch=d.get("epoch"),
connection_reset_count=d.get("connectionResetCount"),
dns_log=d.get("dnsLog"),
disconnection_time=d.get("disconnectionTime"),
)


@dataclass
class AosTelemetryEndpoint:
"""
Represents the AosTelemetryEndpoint
"""

id: str
host: str
port: int
streaming_type: str
protocol: str
sequencing_mode: str
ep_status: AosTelemetryEndpointStatus

@classmethod
def from_json(cls, d: dict) -> "AosTelemetryEndpoint":
return AosTelemetryEndpoint(
id=d.get("id"),
host=d.get("host"),
port=d.get("port"),
streaming_type=d.get("streaming_type"),
protocol=d.get("protocol"),
sequencing_mode=d.get("sequencing_mode"),
ep_status=AosTelemetryEndpointStatus.from_json(d.get("status")),
)


class AosTelemetryManager(AosSubsystem):
"""
Telemetry manager class used to manage the telemetry endpoints
"""

def add_endpoint(
self,
host: str,
port: int,
streaming_type: str,
protocol: str = "protoBufOverTcp",
mode: str = "sequenced",
) -> Response:
"""
Parameters
----------
host
(str) AOS server url or ip address
port
(int) AOS server port (ex 80, 443)
streaming_type
(str) Type of telemetry to stream (alerts/events/perfmon)
protocol
(str) Protocol for data default is protoBufOverTcp
mode
(str) sequenced/unsequenced. default is sequenced

"""
body = {
"hostname": host,
"port": port,
"streaming_type": streaming_type,
"sequencing_mode": mode,
"protocol": protocol,
}
return self.rest.json_resp_post(uri="/api/streaming-config", data=body)

def get_endpoints(self) -> List[AosTelemetryEndpoint]:
"""
Get the existing streaming endpoints as AosTelemetryEndpoint objects
"""
r = self.rest.json_resp_get(uri="/api/streaming-config")
return [AosTelemetryEndpoint.from_json(i) for i in r.get("items")]

def delete_endpoint(self, id: str) -> Response:
"""
Delete a single endpoint
Parameters
----------
id
(str) id of the endpoint
"""
return self.rest.delete(uri="/api/streaming-config/" + id)

def delete_all_endpoints(self) -> List[bool]:
"""
Cycle through and delete all the streaming endpoints.
"""
eps = self.get_endpoints()
ret = []
for ep in eps:
ret.append(self.delete_endpoint(ep.id))
return True
755 changes: 755 additions & 0 deletions scripts/telemetry/aosstream_pb2.py

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions scripts/telemetry/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import socket
import aosstream_pb2
import sys

"""
This is a simple single-threaded listener.
Use 0.0.0.0 to listen on the local host.
pick a port and pick alerts, events or perfmon
"""
l = len(sys.argv)
if l <= 3:
print ("Usage : listener.py host port [alerts|events|perfmon]")
exit()

if l > 3 : listen_to=sys.argv[3]
if l > 2 : port = int(sys.argv[2])
if l > 1 : host = sys.argv[1]


def parse_message(data):
pos = 0
sm = aosstream_pb2.AosSequencedMessage()
sm.ParseFromString(data)
m = aosstream_pb2.AosMessage()
m.ParseFromString(sm.aos_proto)
print (m)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
print(f"Listening on {host} and {port}")
s.listen()
conn, addr = s.accept()
with conn:
print(f"Connected by {addr}")
print("recompiled")
while (True):
try:
data = conn.recv(2)
l = int.from_bytes(data, "big")
data = conn.recv(l)
print ("DATA RECV")
print (data)
if (not data):
break
parse_message(data)
except Exception as e:
print (e)
27 changes: 27 additions & 0 deletions scripts/telemetry/start_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import aos
from aos.client import AosClient
import urllib3
import json
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


# You will need to update the connection details below with your
# specific AOS instance
AOS_IP = ""
AOS_PORT = 0
AOS_USER = ""
AOS_PW = ""

# Where is the listener listening?
LISTENER_IP = ""
ALERTS_PORT = 64420
EVENTS_PORT = 64421
PERFMON_PORT = 64422

# Login
aos = AosClient(protocol="https", host=AOS_IP, port=AOS_PORT)
aos.auth.login(AOS_USER, AOS_PW)
aos.telemetry_mgr.add_endpoint("100.123.0.8", ALERTS_PORT, "alerts")
aos.telemetry_mgr.add_endpoint("100.123.0.8", EVENTS_PORT, "events")
aos.telemetry_mgr.add_endpoint("100.123.0.8", PERFMON_PORT, "perfmon")
print(aos.telemetry_mgr.get_endpoints())
24 changes: 24 additions & 0 deletions scripts/telemetry/stop_all_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import aos
from aos.client import AosClient
import urllib3
import json
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


# You will need to update the connection details below with your
# specific AOS instance
AOS_IP = "66.129.234.206"
AOS_PORT = 37000
AOS_USER = "admin"
AOS_PW = "admin"

# Where is the listener listening?
LISTENER_IP = "100.123.0.8"
ALERTS_PORT = 64429
EVENTS_PORT = 64428
PERFMON_PORT = 64427

# Login
aos = AosClient(protocol="https", host=AOS_IP, port=AOS_PORT)
aos.auth.login(AOS_USER, AOS_PW)
aos.telemetry_mgr.delete_all_endpoints()
113 changes: 113 additions & 0 deletions tests/fixtures/aos/4.0.0/telemetry/endpoints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"items": [
{
"status": {
"connectionLog": [
{
"date": "2022-05-09T15:54:34.909380+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:39.910426+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:44.911002+00:00",
"message": "Eof encountered"
}
],
"connectionTime": null,
"lastTransmittedTime": "2022-05-09T15:54:35.121503+00:00",
"epoch": "2022-05-09T15:54:34.909126+00:00",
"connectionResetCount": 3,
"streamingEndpoint": {
"hostname": "100.123.0.8",
"protocol": "protoBufOverTcp",
"port": 64429
},
"dnsLog": [],
"connected": false,
"disconnectionTime": "2022-05-09T15:54:44.910995+00:00"
},
"streaming_type": "alerts",
"sequencing_mode": "sequenced",
"protocol": "protoBufOverTcp",
"hostname": "100.123.0.8",
"port": 64429,
"id": "9ebce4cc-8119-4e2d-b080-789cbbe57d32"
},
{
"status": {
"connectionLog": [
{
"date": "2022-05-09T15:54:35.121511+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:40.122113+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:45.122693+00:00",
"message": "Eof encountered"
}
],
"connectionTime": null,
"lastTransmittedTime": "2022-05-09T15:54:41.128922+00:00",
"epoch": "2022-05-09T15:54:35.121228+00:00",
"connectionResetCount": 3,
"streamingEndpoint": {
"hostname": "100.123.0.8",
"protocol": "protoBufOverTcp",
"port": 64427
},
"dnsLog": [],
"connected": false,
"disconnectionTime": "2022-05-09T15:54:45.122685+00:00"
},
"streaming_type": "perfmon",
"sequencing_mode": "sequenced",
"protocol": "protoBufOverTcp",
"hostname": "100.123.0.8",
"port": 64427,
"id": "47c455f5-59ad-4582-9608-aa6c09398385"
},
{
"status": {
"connectionLog": [
{
"date": "2022-05-09T15:54:35.013046+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:40.013578+00:00",
"message": "Eof encountered"
},
{
"date": "2022-05-09T15:54:45.014154+00:00",
"message": "Eof encountered"
}
],
"connectionTime": null,
"lastTransmittedTime": "2022-05-09T15:54:45.122669+00:00",
"epoch": "2022-05-09T15:54:35.012796+00:00",
"connectionResetCount": 3,
"streamingEndpoint": {
"hostname": "100.123.0.8",
"protocol": "protoBufOverTcp",
"port": 64428
},
"dnsLog": [],
"connected": false,
"disconnectionTime": "2022-05-09T15:54:45.014146+00:00"
},
"streaming_type": "events",
"sequencing_mode": "sequenced",
"protocol": "protoBufOverTcp",
"hostname": "100.123.0.8",
"port": 64428,
"id": "d8f5a012-232c-4aca-b5c9-637dbba8cb4b"
}
]
}

Loading