Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions src/azure-cli-telemetry/azure/cli/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
DEFAULT_INSTRUMENTATION_KEY = 'c4395b75-49cc-422c-bc95-c7d51aef5d46'


def _start(config_dir):
def _start(config_dir, cache_dir):
from azure.cli.telemetry.components.telemetry_logging import get_logger

logger = get_logger('process')

args = [sys.executable, os.path.realpath(__file__), config_dir]
logger.info('Creating upload process: "%s %s %s"', *args)
args = [sys.executable, os.path.realpath(__file__), config_dir, cache_dir]
logger.info('Creating upload process: "%s %s %s %s"', *args)

kwargs = {'args': args}
if os.name == 'nt':
Expand All @@ -42,8 +42,8 @@ def _start(config_dir):
kwargs['stdout'] = subprocess.DEVNULL
kwargs['stderr'] = subprocess.STDOUT

subprocess.Popen(**kwargs)
logger.info('Return from creating process')
process = subprocess.Popen(**kwargs)
logger.info('Return from creating process %s', process.pid)


def save(config_dir, payload):
Expand Down Expand Up @@ -74,38 +74,34 @@ def save(config_dir, payload):
logger.info("Split cli events and extra events failure: %s", str(ex))
cli_payload = payload

if save_payload(config_dir, cli_payload):
cache_dir = save_payload(config_dir, cli_payload)
if cache_dir:
logger.info('Begin creating telemetry upload process.')
_start(config_dir)
_start(config_dir, cache_dir)
logger.info('Finish creating telemetry upload process.')


def main():
from azure.cli.telemetry.components.telemetry_note import TelemetryNote
from azure.cli.telemetry.components.records_collection import RecordsCollection
from azure.cli.telemetry.components.telemetry_client import CliTelemetryClient
from azure.cli.telemetry.components.telemetry_logging import config_logging_for_upload, get_logger

try:
config_dir = sys.argv[1]
cache_dir = sys.argv[2]
config_logging_for_upload(config_dir)

logger = get_logger('main')
logger.info('Attempt start. Configuration directory [%s].', sys.argv[1])
logger.info('Attempt start. Configuration directory [%s]. Cache directory [%s].', sys.argv[1], sys.argv[2])

try:
with TelemetryNote(config_dir) as telemetry_note:
telemetry_note.touch()
collection = RecordsCollection(cache_dir)
collection.snapshot_and_read()

collection = RecordsCollection(telemetry_note.get_last_sent(), config_dir)
collection.snapshot_and_read()

client = CliTelemetryClient()
for each in collection:
client.add(each, flush=True)
client.flush(force=True)

telemetry_note.update_telemetry_note(collection.next_send)
client = CliTelemetryClient()
for each in collection:
client.add(each, flush=True)
client.flush(force=True)
except portalocker.AlreadyLocked:
# another upload process is running.
logger.info('Lock out from note file under %s which means another process is running. Exit 0.', config_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,29 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import datetime
import os
import shutil
import stat
import tempfile


class RecordsCollection:
def __init__(self, last_sent, config_dir):
def __init__(self, cache_dir):
from azure.cli.telemetry.components.telemetry_logging import get_logger

self._last_sent = last_sent
self._next_send = last_sent
self._records = []
self._logger = get_logger('records')
self._config_dir = config_dir
self._cache_dir = cache_dir

def __iter__(self):
return self._records.__iter__()

@property
def next_send(self):
return self._next_send

def snapshot_and_read(self):
""" Scan the telemetry cache files and move all the rotated files to a temp directory. """
from azure.cli.telemetry.const import TELEMETRY_CACHE_DIR

folder = os.path.join(self._config_dir, TELEMETRY_CACHE_DIR)
if not os.path.isdir(folder):
""" Scan the telemetry cache files. """
if not os.path.isdir(self._cache_dir):
return

# Collect all cache/cache.x files
candidates = [(fn, os.stat(os.path.join(folder, fn))) for fn in os.listdir(folder)]
candidates = [(fn, os.stat(os.path.join(self._cache_dir, fn))) for fn in os.listdir(self._cache_dir)]

# sort the cache files base on their last modification time.
candidates = [(fn, file_stat) for fn, file_stat in candidates if stat.S_ISREG(file_stat.st_mode)]
Expand All @@ -46,27 +35,15 @@ def snapshot_and_read(self):
self._logger.info('No cache to be uploaded.')
return

tmp = tempfile.mkdtemp()
self._logger.info('%d cache files to move.', len(candidates))
self._logger.info('Create temp folder %s', tmp)

for each in candidates:
if stat.S_ISREG(each[1].st_mode):
try:
# Platform question: if this op is atom
shutil.move(os.path.join(folder, each[0]), os.path.join(tmp, each[0]))
self._logger.info('Move file %s to %s', os.path.join(folder, each[0]), os.path.join(tmp, each[0]))
except IOError as err:
self._logger.warning('Fail to move file from %s to %s. Reason: %s.',
os.path.join(folder, each[0]), os.path.join(tmp, each[0]), err)
self._logger.info('%d cache files to upload.', len(candidates))

for each in os.listdir(tmp):
self._read_file(os.path.join(tmp, each))
for each in os.listdir(self._cache_dir):
self._read_file(os.path.join(self._cache_dir, each))

shutil.rmtree(tmp,
shutil.rmtree(self._cache_dir,
ignore_errors=True,
onerror=lambda _, p, tr: self._logger.error('Fail to remove file %s', p))
self._logger.info('Remove directory %s', tmp)
self._logger.info('Remove directory %s', self._cache_dir)

def _read_file(self, path):
""" Read content of a telemetry cache file and parse them into records. """
Expand All @@ -82,10 +59,7 @@ def _read_file(self, path):
def _add_record(self, content_line):
""" Parse a line in the recording file. """
try:
time, content = content_line.split(',', 1)
time = datetime.datetime.strptime(time, '%Y-%m-%dT%H:%M:%S')
if time > self._last_sent:
self._next_send = max(self._next_send, time)
self._records.append(content)
_, content = content_line.split(',', 1)
self._records.append(content)
except ValueError as err:
self._logger.warning("Fail to parse a line of the record %s. Error %s.", content_line, err)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import datetime
import os
import shutil
import tempfile
Expand All @@ -27,44 +26,22 @@ def tearDown(self):
shutil.rmtree(self.work_dir, ignore_errors=True)

def test_create_records_collection(self):
collection = RecordsCollection(datetime.datetime.min, self.work_dir)
collection = RecordsCollection(os.path.join(self.work_dir, TELEMETRY_CACHE_DIR))

# cache files are not moved while the collection is created
self.assert_cache_files_count(self.TEST_CACHE_FILE_COUNT)

# take snapshot and move the files
collection.snapshot_and_read()

# all files are moved, including the 'cache' file.
self.assert_cache_files_count(0)
# all files are moved, including the directory.
self.assertFalse(os.path.exists(os.path.join(self.work_dir, TELEMETRY_CACHE_DIR)))

# total records
self.assertEqual(1758, len([r for r in collection]))

def test_create_records_collection_with_last_send(self):
last_send = datetime.datetime.now() - datetime.timedelta(minutes=20)
collection = RecordsCollection(last_send, self.work_dir)
collection.snapshot_and_read()

# cache strategy has been dropped, so no `cache` file
self.assert_cache_files_count(0)
# no new records since last_send
self.assertEqual(0, len([r for r in collection]))

def test_create_records_collection_against_missing_config_folder(self):
collection = RecordsCollection(datetime.datetime.min, tempfile.mktemp())
self.assertEqual(0, len([r for r in collection]))

def test_create_records_collection_against_missing_folder(self):
shutil.rmtree(os.path.join(self.work_dir, TELEMETRY_CACHE_DIR))
collection = RecordsCollection(datetime.datetime.min, self.work_dir)
self.assertEqual(0, len([r for r in collection]))

def test_create_records_collection_against_empty_folder(self):
shutil.rmtree(os.path.join(self.work_dir, TELEMETRY_CACHE_DIR))
os.makedirs(os.path.join(self.work_dir, TELEMETRY_CACHE_DIR))

collection = RecordsCollection(datetime.datetime.min, self.TEST_RESOURCE_FOLDER)
collection = RecordsCollection(tempfile.mktemp())
self.assertEqual(0, len([r for r in collection]))

def assert_cache_files_count(self, count):
Expand Down
14 changes: 8 additions & 6 deletions src/azure-cli-telemetry/azure/cli/telemetry/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ def save_payload(config_dir, payload):
logger = logging.getLogger('telemetry.save')

if payload:
cache_saver = _create_rotate_file_logger(config_dir)
cache_saver, cache_dir = _create_rotate_file_logger(config_dir)
if cache_saver:
cache_saver.info(payload)
logger.info('Save telemetry record of length %d in cache', len(payload))
logger.info('Save telemetry record of length %d in cache file under %s', len(payload), cache_dir)

return True
return False
return cache_dir
return None


def _create_rotate_file_logger(log_dir):
cache_name = os.path.join(log_dir, 'telemetry', 'cache')
from datetime import datetime
now = datetime.now()
cache_name = os.path.join(log_dir, 'telemetry', now.strftime('%Y%m%d%H%M%S%f')[:-3], 'cache')
try:
if not os.path.exists(os.path.dirname(cache_name)):
os.makedirs(os.path.dirname(cache_name))
Expand All @@ -37,7 +39,7 @@ def _create_rotate_file_logger(log_dir):

logger = logging.Logger(name='telemetry_cache', level=logging.INFO)
logger.addHandler(handler)
return logger
return logger, os.path.dirname(cache_name)
except OSError as err:
logging.getLogger('telemetry.save').warning('Fail to create telemetry cache directory for %s. Reason %s.',
cache_name,
Expand Down