Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions cloudinit/reporting/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import os.path
import time

from . import instantiated_handler_registry
from . import instantiated_handler_registry, available_handlers

FINISH_EVENT_TYPE = 'finish'
START_EVENT_TYPE = 'start'
Expand Down Expand Up @@ -81,17 +81,32 @@ def as_dict(self):
return data


def report_event(event):
"""Report an event to all registered event handlers.
def report_event(event, excluded_handler_types=None):
"""Report an event to all registered event handlers
except those whose type is in excluded_handler_types.

This should generally be called via one of the other functions in
the reporting module.

:param excluded_handler_types:
List of handlers types to exclude from reporting the event to.
:param event_type:
The type of the event; this should be a constant from the
reporting module.
"""
for _, handler in instantiated_handler_registry.registered_items.items():

if not excluded_handler_types:
excluded_handler_types = {}
excluded_handler_classes = {
hndl_cls
for hndl_type, hndl_cls in available_handlers.registered_items.items()
if hndl_type in excluded_handler_types
}

handlers = instantiated_handler_registry.registered_items.items()
for _, handler in handlers:
if type(handler) in excluded_handler_classes:
continue # skip this excluded handler
handler.publish_event(event)


Expand Down
12 changes: 8 additions & 4 deletions cloudinit/reporting/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
"""
HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
# The maximum value size expected in Azure
Comment thread
Moustafa-Moustafa marked this conversation as resolved.
HV_KVP_AZURE_MAX_VALUE_SIZE = 1024
HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
Expand Down Expand Up @@ -195,7 +197,8 @@ def _iterate_kvps(self, offset):
def _event_key(self, event):
"""
the event key format is:
CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<time>
CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<uuid>
[|subevent_index]
"""
return u"{0}|{1}|{2}|{3}".format(self.event_key_prefix,
event.event_type, event.name,
Expand Down Expand Up @@ -249,13 +252,14 @@ def _break_down(self, key, meta_data, description):
data_without_desc = json.dumps(meta_data,
separators=self.JSON_SEPARATORS)
room_for_desc = (
self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
self.HV_KVP_AZURE_MAX_VALUE_SIZE -
len(data_without_desc) - 8)
value = data_without_desc.replace(
message_place_holder,
'"{key}":"{desc}"'.format(
key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
result_array.append(self._encode_kvp_item(key, value))
subkey = "{}|{}".format(key, i)
result_array.append(self._encode_kvp_item(subkey, value))
i += 1
des_in_json = des_in_json[room_for_desc:]
if len(des_in_json) == 0:
Expand All @@ -282,7 +286,7 @@ def _encode_event(self, event):
# if it reaches the maximum length of kvp value,
# break it down to slices.
# this should be very corner case.
if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
if len(value) > self.HV_KVP_AZURE_MAX_VALUE_SIZE:
return self._break_down(key, meta_data, event.description)
else:
data = self._encode_kvp_item(key, value)
Expand Down
12 changes: 8 additions & 4 deletions cloudinit/sources/DataSourceAzure.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
report_diagnostic_event,
EphemeralDHCPv4WithReporting,
is_byte_swapped,
dhcp_log_cb)
dhcp_log_cb,
push_log_to_kvp)

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -789,9 +790,12 @@ def _negotiate(self):

@azure_ds_telemetry_reporter
def activate(self, cfg, is_new_instance):
address_ephemeral_resize(is_new_instance=is_new_instance,
preserve_ntfs=self.ds_cfg.get(
DS_CFG_KEY_PRESERVE_NTFS, False))
try:
address_ephemeral_resize(is_new_instance=is_new_instance,
preserve_ntfs=self.ds_cfg.get(
DS_CFG_KEY_PRESERVE_NTFS, False))
finally:
push_log_to_kvp(self.sys_cfg['def_log_file'])
return

@property
Expand Down
58 changes: 56 additions & 2 deletions cloudinit/sources/helpers/azure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.

import base64
import json
import logging
import os
Expand All @@ -8,7 +8,9 @@
import struct
import time
import textwrap
import zlib

from cloudinit.settings import CFG_BUILTIN
from cloudinit.net import dhcp
from cloudinit import stages
from cloudinit import temp_utils
Expand All @@ -33,7 +35,14 @@
BOOT_EVENT_TYPE = 'boot-telemetry'
SYSTEMINFO_EVENT_TYPE = 'system-info'
DIAGNOSTIC_EVENT_TYPE = 'diagnostic'

COMPRESSED_EVENT_TYPE = 'compressed'
# Maximum number of bytes of the cloud-init.log file that can be dumped to KVP
# at once. This number is based on the analysis done on a large sample of
# cloud-init.log files where the P95 of the file sizes was 537KB and the time
# consumed to dump 500KB file was (P95:76, P99:233, P99.9:1170) in ms
MAX_LOG_TO_KVP_LENGTH = 512000
Comment thread
Moustafa-Moustafa marked this conversation as resolved.
# Marker file to indicate whether cloud-init.log is pushed to KVP
LOG_PUSHED_TO_KVP_MARKER_FILE = '/var/lib/cloud/data/log_pushed_to_kvp'
azure_ds_reporter = events.ReportEventStack(
name="azure-ds",
description="initialize reporter for azure ds",
Expand Down Expand Up @@ -177,6 +186,49 @@ def report_diagnostic_event(str):
return evt


def report_compressed_event(event_name, event_content):
"""Report a compressed event"""
compressed_data = base64.encodebytes(zlib.compress(event_content))
event_data = {"encoding": "gz+b64",
"data": compressed_data.decode('ascii')}
evt = events.ReportingEvent(
COMPRESSED_EVENT_TYPE, event_name,
json.dumps(event_data),
events.DEFAULT_EVENT_ORIGIN)
events.report_event(evt,
excluded_handler_types={"log", "print", "webhook"})

# return the event for unit testing purpose
return evt


@azure_ds_telemetry_reporter
def push_log_to_kvp(file_name=CFG_BUILTIN['def_log_file']):
Comment thread
Moustafa-Moustafa marked this conversation as resolved.
"""Push a portion of cloud-init.log file or the whole file to KVP
based on the file size.
If called more than once, it skips pushing the log file to KVP again."""

log_pushed_to_kvp = bool(os.path.isfile(LOG_PUSHED_TO_KVP_MARKER_FILE))
if log_pushed_to_kvp:
report_diagnostic_event("cloud-init.log is already pushed to KVP")
return

LOG.debug("Dumping cloud-init.log file to KVP")
try:
with open(file_name, "rb") as f:
f.seek(0, os.SEEK_END)
seek_index = max(f.tell() - MAX_LOG_TO_KVP_LENGTH, 0)
report_diagnostic_event(
"Dumping last {} bytes of cloud-init.log file to KVP".format(
f.tell() - seek_index))
f.seek(seek_index, os.SEEK_SET)
report_compressed_event("cloud-init.log", f.read())
util.write_file(LOG_PUSHED_TO_KVP_MARKER_FILE, '')
except Exception as ex:
report_diagnostic_event("Exception when dumping log file: %s" %
repr(ex))


@contextmanager
def cd(newdir):
prevdir = os.getcwd()
Expand Down Expand Up @@ -474,6 +526,8 @@ def build_report(

@azure_ds_telemetry_reporter
def _post_health_report(self, document):
push_log_to_kvp()

# Whenever report_diagnostic_event(diagnostic_msg) is invoked in code,
# the diagnostic messages are written to special files
# (/var/opt/hyperv/.kvp_pool_*) as Hyper-V KVP messages.
Expand Down
74 changes: 71 additions & 3 deletions tests/unittests/test_reporting_hyperv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# This file is part of cloud-init. See LICENSE file for license information.
import base64
import zlib

from cloudinit.reporting import events
from cloudinit.reporting.handlers import HyperVKvpReportingHandler
from cloudinit.reporting import events, instantiated_handler_registry
from cloudinit.reporting.handlers import HyperVKvpReportingHandler, LogHandler

import json
import os
Expand Down Expand Up @@ -72,7 +74,7 @@ def test_file_operation_issue(self):
def test_event_very_long(self):
reporter = HyperVKvpReportingHandler(
kvp_file_path=self.tmp_file_path)
description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
description = 'ab' * reporter.HV_KVP_AZURE_MAX_VALUE_SIZE
long_event = events.FinishReportingEvent(
'event_name',
description,
Expand Down Expand Up @@ -199,6 +201,72 @@ def test_report_diagnostic_event(self):
if "test_diagnostic" not in evt_msg:
raise AssertionError("missing expected diagnostic message")

def test_report_compressed_event(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
try:
instantiated_handler_registry.register_item("telemetry", reporter)
event_desc = b'test_compressed'
azure.report_compressed_event(
"compressed event", event_desc)

self.validate_compressed_kvps(reporter, 1, [event_desc])
finally:
instantiated_handler_registry.unregister_item("telemetry",
force=False)

@mock.patch.object(LogHandler, 'publish_event')
def test_push_log_to_kvp(self, publish_event):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
try:
instantiated_handler_registry.register_item("telemetry", reporter)
Comment thread
Moustafa-Moustafa marked this conversation as resolved.
log_file = self.tmp_path("cloud-init.log")
azure.MAX_LOG_TO_KVP_LENGTH = 100
azure.LOG_PUSHED_TO_KVP_MARKER_FILE = self.tmp_path(
'log_pushed_to_kvp')
with open(log_file, "w") as f:
log_content = "A" * 50 + "B" * 100
f.write(log_content)
azure.push_log_to_kvp(log_file)

with open(log_file, "a") as f:
extra_content = "C" * 10
f.write(extra_content)
azure.push_log_to_kvp(log_file)

for call_arg in publish_event.call_args_list:
event = call_arg[0][0]
self.assertNotEqual(
event.event_type, azure.COMPRESSED_EVENT_TYPE)
self.validate_compressed_kvps(
reporter, 1,
[log_content[-azure.MAX_LOG_TO_KVP_LENGTH:].encode()])
finally:
instantiated_handler_registry.unregister_item("telemetry",
force=False)

def validate_compressed_kvps(self, reporter, count, values):
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
compressed_count = 0
for i in range(len(kvps)):
kvp = kvps[i]
kvp_value = kvp['value']
kvp_value_json = json.loads(kvp_value)
evt_msg = kvp_value_json["msg"]
evt_type = kvp_value_json["type"]
if evt_type != azure.COMPRESSED_EVENT_TYPE:
continue
evt_msg_json = json.loads(evt_msg)
evt_encoding = evt_msg_json["encoding"]
evt_data = zlib.decompress(
base64.decodebytes(evt_msg_json["data"].encode("ascii")))

self.assertLess(compressed_count, len(values))
self.assertEqual(evt_data, values[compressed_count])
self.assertEqual(evt_encoding, "gz+b64")
compressed_count += 1
self.assertEqual(compressed_count, count)

def test_unique_kvp_key(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
evt1 = events.ReportingEvent(
Expand Down