Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,53 +50,81 @@ internal void EnqueueLogEvent(BetterStackLogEnvelope logEnvelope)
}
}

// This task runs continously until the cancellation token is canceled
private async Task FlushQueue()
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
var limit = _currentConfig.BatchSize;
var batch = new List<BetterStackLogEnvelope>();

while (limit > 0 && _logQueue.TryTake(out var message))
int messagesFlushed = await FlushBatch().ConfigureAwait(false);
if (messagesFlushed == 0)
{
batch.Add(message);
limit--;
await Task.Delay(_currentConfig.FlushFrequency, _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
}

// This method uploads a batch of messages to betterstack.com
private async Task<int> FlushBatch()
{
var limit = _currentConfig.BatchSize;
var batch = new List<BetterStackLogEnvelope>();

while (limit > 0 && _logQueue.TryTake(out var message))
{
batch.Add(message);
limit--;
}

if (batch.Any())
if (batch.Any())
{
try
{
try
{
await _client.UploadAsync(batch, _cancellationTokenSource.Token);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to upload logs to BetterStack: {ex.Message}");
}
// Cancellation.None on the following line might seem counterintuitive, but it's
// actually quite important. We want to ensure the upload of a given batch of
// messages completes sucessfully even when _cancellationTokenSource.Token is
// cancelled (which indicates that BetterStackLoggerProvider is being disposed).
await _client.UploadAsync(batch, CancellationToken.None).ConfigureAwait(false);
}
else
catch (Exception ex)
{
await Task.Delay(_currentConfig.FlushFrequency, _cancellationTokenSource.Token);
Console.WriteLine($"Failed to upload logs to BetterStack: {ex.Message}");
}
}

return batch.Count;
}

public void Dispose()
{
_loggers.Clear();
_onChangeToken?.Dispose();
_cancellationTokenSource.Dispose();

// Stop "_flushTask" and wait for it completes
_cancellationTokenSource.Cancel();
_flushTask.Wait(TimeSpan.FromMilliseconds(500));

// There's a possibility the Task was stoped before the queue
// could be fully processed. Therefore we need to flush any
// remaining messages.
try
{
_flushTask.Wait(_currentConfig.FlushFrequency);
// Flush remaining messages until the queue is empty
int batchCount = 0;
do
{
batchCount = FlushBatch().GetAwaiter().GetResult();
} while (batchCount > 0);
}
catch (TaskCanceledException)
{
}
catch (AggregateException ex) when (ex.InnerExceptions.Count == 1 && ex.InnerExceptions[0] is TaskCanceledException)
{
}
finally
{
_cancellationTokenSource.Dispose();
}
}

public void SetScopeProvider(IExternalScopeProvider scopeProvider) => ScopeProvider = scopeProvider;
Expand Down