Skip to content

Commit 97fef3c

Browse files
docs: ✨ Add new Telemetry Demo code example
1 parent e04cfa1 commit 97fef3c

File tree

17 files changed

+11484
-0
lines changed

17 files changed

+11484
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM ubuntu:20.04
2+
RUN mkdir /app
3+
COPY ./app/ec-telemetry /app/ec-telemetry
4+
RUN chmod +x /app/ec-telemetry
5+
RUN set -xe
6+
RUN apt-get update
7+
RUN apt-get install -y python3-pip
8+
RUN apt-get install -y iputils-ping && \
9+
DEBIAN_FRONTEND=noninteractive apt-get -qq install dnsutils
10+
RUN pip install --upgrade pip
11+
RUN pip install -r /app/ec-telemetry/requirements.txt
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
# THIS DEMO IS NOT MEANT TO BE A PRODUCTION MONITORING SOLUTION
2+
3+
# This example code serves to demonstrate the possibilities in
4+
# retrieving and visualizing telemetry data from Orchestrator and
5+
# EdgeConnect. The demo is simple to run, however, modifying
6+
# and/or incorporating components of this work into your own solution
7+
# requires a greater understanding of Python, Docker, InfluxDB,
8+
# Grafana, and Redis.
9+
10+
# All of these components are not necessarily required for developing
11+
# your own solution as there may be pieces of data you don't need
12+
# to collect, some additional others that you want to add,
13+
# substitude alternative tools for task queuing, database, and
14+
# visualization or alerting needs.
15+
16+
import json
17+
import logging
18+
import os
19+
import time
20+
from logging.handlers import RotatingFileHandler
21+
22+
import redis
23+
from influxdb_client import InfluxDBClient
24+
from rq import Queue
25+
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
26+
27+
from pyedgeconnect import Orchestrator
28+
29+
# Define Redis instance and Queue to connect to
30+
r = redis.Redis(host="redis", port=6379)
31+
q = Queue("ectelem", connection=r, failure_ttl=3600, result_ttl=0)
32+
33+
# Setup log settings for messages and errors
34+
log_max_bytes = int(os.getenv("LOG_MAX_BYTES"))
35+
log_max_backups = int(os.getenv("LOG_MAX_BACKUPS"))
36+
log_level = os.getenv("LOG_LEVEL")
37+
logger = logging.getLogger(__name__)
38+
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
39+
local_log_directory = "logging/"
40+
if not os.path.exists(local_log_directory):
41+
os.makedirs(local_log_directory)
42+
log_file_handler = RotatingFileHandler(
43+
f"{local_log_directory}ec-telemetry.log",
44+
maxBytes=log_max_bytes,
45+
backupCount=log_max_backups,
46+
)
47+
# Set logging severity level from environment variable
48+
log_file_handler.setFormatter(formatter)
49+
if log_level == "CRITICAL":
50+
logger.setLevel(logging.CRTICAL)
51+
elif log_level == "ERROR":
52+
logger.setLevel(logging.ERROR)
53+
elif log_level == "WARNING":
54+
logger.setLevel(logging.WARNING)
55+
elif log_level == "INFO":
56+
logger.setLevel(logging.INFO)
57+
elif log_level == "DEBUG":
58+
logger.setLevel(logging.DEBUG)
59+
elif log_level == None:
60+
logger.disabled = True
61+
logger.addHandler(log_file_handler)
62+
63+
# Map environment variables
64+
db_url = os.getenv("DB_URL")
65+
db_token = os.getenv("DB_TOKEN")
66+
db_org = os.getenv("DB_ORG")
67+
db_bucket = os.getenv("DB_BUCKET")
68+
orch_url = os.getenv("ORCH_URL")
69+
orch_api_key = os.getenv("ORCH_API_KEY")
70+
71+
# On first run, empty previous task queues
72+
# Empty Failed Queue
73+
failed_registry = FailedJobRegistry(queue=q)
74+
for job_id in failed_registry.get_job_ids():
75+
failed_registry.remove(job_id, delete_job=True)
76+
# Empty Main Queue
77+
main_registry = ScheduledJobRegistry(queue=q)
78+
for job_id in main_registry.get_job_ids():
79+
main_registry.remove(job_id, delete_job=True)
80+
81+
# Run loop on telemetry collection queueing for appliances
82+
while True:
83+
84+
# Start log
85+
logger.critical("*** --- *** --- EC TELEMETRY NEW QUEUE STARTING --- *** --- ***")
86+
87+
# Instantiate Orchestrator for discovering appliances
88+
orch = Orchestrator(
89+
orch_url,
90+
api_key=orch_api_key,
91+
verify_ssl=False,
92+
)
93+
94+
logger.debug(f"CONFIGURED ORCH: {orch_url}")
95+
logger.debug(f"CONFIGURED DB: {db_url}")
96+
97+
# Check reachability to Orchestrator, raise exception/log if unable
98+
orch_confirm_auth = orch.get_orchestrator_hello()
99+
# Check reachability to InfluxDB, raise exception/log if unable
100+
db_test_client = InfluxDBClient(
101+
url=db_url,
102+
token=db_token,
103+
org=db_org,
104+
)
105+
db_health = db_test_client.health()
106+
107+
# Validate response checks for Orchestrator and InfluxDB
108+
if (
109+
orch_confirm_auth != "There was an internal server error."
110+
and orch_confirm_auth is not False
111+
):
112+
logger.debug("ORCH REACHABLE")
113+
if db_health.status == "pass":
114+
logger.debug("DB REACHABLE")
115+
ready_to_retrieve = True
116+
else:
117+
logger.critical("DB UNREACHABLE -- NO DATA RETRIEVED/PROCESSED")
118+
logger.error(db_health)
119+
ready_to_retrieve = False
120+
121+
else:
122+
logger.critical(
123+
"ORCH AUTH FAILED OR UNREACHABLE -- NO DATA RETRIEVED/PROCESSED"
124+
)
125+
ready_to_retrieve = False
126+
127+
# Delete tes connection to InfluxDB
128+
db_test_client.__del__()
129+
130+
# If Orchestrator and InfluxDB are reachable,
131+
# begin telemetry collection
132+
if ready_to_retrieve:
133+
134+
# Get appliances and determine reachability
135+
logger.debug(f"Retrieving appliance data from Orchestrator")
136+
appliances = orch.get_appliances()
137+
appliance_state_time = int(time.time())
138+
139+
# Get interface labels from Orchestrator and map the interface
140+
# labels into a flat dictionary
141+
logger.debug(f"Retrieving interface label data from Orchestrator")
142+
orch_int_labels = orch.get_all_interface_labels()
143+
interface_labels = {}
144+
for label in orch_int_labels["wan"]:
145+
interface_labels[label] = orch_int_labels["wan"][label]["name"]
146+
for label in orch_int_labels["lan"]:
147+
interface_labels[label] = orch_int_labels["lan"][label]["name"]
148+
149+
# Get overlay ids from Orchestrator and map the overlay ids into
150+
# a flat dictionary
151+
logger.debug(f"Retrieving overlay data from Orchestrator")
152+
overlays = orch.get_all_overlays_config()
153+
overlay_ids = {}
154+
for overlay in overlays:
155+
overlay_ids[str(overlay["id"])] = overlay["name"]
156+
157+
# Get appliance licensing information from Orchestrator
158+
logger.debug(f"Retrieving appliance licensing data from Orchestrator")
159+
licensing = orch.get_portal_licensed_appliances()
160+
161+
# Limit appliances to submit to job queue if any are specified
162+
# in the limit_appliances.json file
163+
limit_filename = "/app/ec-telemetry/limit_appliances.json"
164+
with open(limit_filename) as limit_file:
165+
appliance_subset = json.load(limit_file)["appliance_subset"]
166+
167+
# Create flat list of all appliance hostnames in Orchestrator
168+
# to reverse check against appliances listed in
169+
# limit_appliances.json file to log error if appliance
170+
# referenced that does not exist
171+
all_appliance_hostnames = []
172+
for appliance in appliances:
173+
all_appliance_hostnames.append(appliance["hostName"])
174+
175+
appliance_set = []
176+
# If appliances are listed in the limit file, filter only
177+
# appliances with matching hostnames to job queue.
178+
if len(appliance_subset) > 0:
179+
for appliance in appliances:
180+
if appliance["hostName"] in appliance_subset:
181+
appliance_set.append(appliance)
182+
for appliance in appliance_subset:
183+
if appliance not in all_appliance_hostnames:
184+
logger.error(
185+
f"No appliance with hostname {appliance} found in Orchestrator, please update limit_appliances.json contents"
186+
)
187+
# If no appliances are listed in the limit file, add the first
188+
# four currently reachable appliancesto the queue and add their
189+
# hostnames to the limit file
190+
else:
191+
num_random_appliances = 4
192+
logger.warning("No appliances listed in limit_appliances.json file")
193+
random_appliances = []
194+
while len(appliance_set) < num_random_appliances:
195+
for appliance in appliances:
196+
if len(appliance_set) == num_random_appliances:
197+
break
198+
elif appliance["state"] == 1:
199+
appliance_set.append(appliance)
200+
logger.critical(
201+
f"adding {appliance['hostName']} to process list"
202+
)
203+
logger.critical(
204+
f"Total appliances to be collected: {len(appliance_set)}"
205+
)
206+
random_appliances.append(appliance["hostName"])
207+
208+
if (
209+
len(appliance_set) > 0
210+
and len(appliance_set) < num_random_appliances
211+
):
212+
logger.critical(
213+
f"{len(appliance_set)} appliances were currently reachable from Orchestrator"
214+
)
215+
break
216+
elif len(appliance_set) == 0:
217+
logger.critical(
218+
f"No appliances were currently reachable from Orchestrator"
219+
)
220+
break
221+
222+
if len(appliance_set) != 0:
223+
new_appliance_subset = {}
224+
new_appliance_subset["appliance_subset"] = random_appliances
225+
with open(limit_filename, "w") as limit_file:
226+
json.dump(new_appliance_subset, limit_file)
227+
logger.warning(
228+
f"Proceeding collection for these reachable appliances {random_appliances}"
229+
)
230+
logger.warning(
231+
"Please update limit_appliances.json to change which appliances telemetry is collected from"
232+
)
233+
234+
if len(appliance_set) > 0:
235+
# Append licensing information into appliance list
236+
for appliance in appliance_set:
237+
for license_item in licensing:
238+
if license_item["applianceId"] == appliance["id"]:
239+
appliance["license_display"] = license_item["licenses"]["fx"][
240+
"tier"
241+
]["display"]
242+
appliance["license_bw"] = license_item["licenses"]["fx"][
243+
"tier"
244+
]["bandwidth"]
245+
246+
# Append interface mapping and overlay mapping to appliance list
247+
# before submitting appliances to job queue
248+
for appliance in appliance_set:
249+
appliance["interface_labels_map"] = interface_labels
250+
appliance["overlay_id_map"] = overlay_ids
251+
appliance["time_retrieved"] = appliance_state_time
252+
253+
logger.info(f"SENDING APPLIANCES TO QUEUE: {len(appliance_set)}")
254+
255+
# Queue each appliance object for workers to process
256+
for appliance in appliance_set:
257+
task = q.enqueue(
258+
"ec_telemetry.ec_data_gather",
259+
appliance,
260+
)
261+
else:
262+
logger.critical(
263+
"No appliances added to collection queue, waiting 60 sec until next attempt"
264+
)
265+
# If appliances have been queued, wait 60 seconds until next run
266+
time.sleep(60)
267+
268+
# If not ready to retrieve (Orchestrator or Influxdb fail
269+
# reachability check), try again in 3 seconds
270+
else:
271+
time.sleep(3)

0 commit comments

Comments
 (0)