Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 19 additions & 53 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,39 @@ jobs:

build-and-test:
runs-on: ubuntu-latest
services:
localstack:
image: localstack/localstack:latest
ports:
- 4563-4599:4563-4599
- 8055:8080
env:
SERVICES: s3
DEBUG: 1
DATA_DIR: /tmp/localstack/data
AWS_SECRET_KEY: 'localkey'
AWS_BUCKET_NAME: 'managed-code-bucket'
AWS_ACCESS_KEY_ID: 'localkey'
AWS_SECRET_ACCESS_KEY: 'localsecret'
DEFAULT_REGION: 'eu-west-1'

steps:

- name: checkout
uses: actions/checkout@v3

- uses: actions/checkout@v3
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 7.0.x

- name: NDepend
uses: ndepend/ndepend-action@v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
license: ${{ secrets.NDEPENDLICENSE }}

- name: azuright
uses: potatoqualitee/azuright@v1.1

- name: docker run fake-gcs-server
run: |
docker run -d --name fake-gcs-server -p 4443:4443 -v ${PWD}/examples/data:/data fsouza/fake-gcs-server -scheme http -external-url "http://localhost:4443"
sleep 5s

- name: check storage emulators
run: |
curl http://localhost:4443/
curl http://localhost:4566/
curl http://localhost:10000/

# run build and test

- name: Restore dependencies
run: dotnet restore

- name: Build
run: dotnet build --no-restore

- name: Test
run: dotnet test --no-build --logger 'trx;LogFileName=test-results.trx'
env:
DEFAULT_REGION: eu-west-1
AWS_ACCESS_KEY_ID: localkey
AWS_SECRET_ACCESS_KEY: localsecret

- name: Collect Code Coverage
run: dotnet test --no-build --verbosity normal /p:CollectCoverage=true /p:CoverletOutputFormat=lcov /p:CoverletOutput=ManagedCode.Storage.Tests/lcov.info
env:
DEFAULT_REGION: eu-west-1
AWS_ACCESS_KEY_ID: localkey
AWS_SECRET_ACCESS_KEY: localsecret


- name: NDepend
uses: ndepend/ndepend-action@v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
license: ${{ secrets.NDEPENDLICENSE }}
coveragefolder: ManagedCode.Storage.Tests
baseline: recent
#baseline: main_recent

# - name: test-reports
# uses: dorny/test-reporter@v1.5.0
# with:
# name: Test Reporter
# reporter: dotnet-trx
# path: ManagedCode.Storage.Tests/test-results.trx

- name : coverlet
uses: b3b00/coverlet-action@1.1.9
with:
Expand All @@ -95,3 +59,5 @@ jobs:
with:
github-token: ${{secrets.GITHUB_TOKEN }}
path-to-lcov: ManagedCode.Storage.Tests/lcov.info


4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<RepositoryUrl>https://github.com/managedcode/Storage</RepositoryUrl>
<PackageProjectUrl>https://github.com/managedcode/Storage</PackageProjectUrl>
<Product>Managed Code - Storage</Product>
<Version>2.1.14</Version>
<PackageVersion>2.1.14</PackageVersion>
<Version>2.1.15-alpha</Version>
<PackageVersion>2.1.15-alpha</PackageVersion>

</PropertyGroup>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
Expand Down
18 changes: 9 additions & 9 deletions ManagedCode.Storage.Aws/AWSStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public override async Task<Result> RemoveContainerAsync(CancellationToken cancel
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result.Fail(ex);
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ protected override async Task<Result> CreateContainerInternalAsync(CancellationT
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result.Fail(ex);
}
}
Expand All @@ -124,7 +124,7 @@ protected override async Task<Result> DeleteDirectoryInternalAsync(string direct
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result.Fail(ex);
}
}
Expand Down Expand Up @@ -183,7 +183,7 @@ await localFile.CopyFromStreamAsync(await StorageClient.GetObjectStreamAsync(Sto
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result<LocalFile>.Fail(ex);
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ await StorageClient.DeleteObjectAsync(new DeleteObjectRequest
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result<bool>.Fail(ex);
}
}
Expand All @@ -235,7 +235,7 @@ protected override async Task<Result<bool>> ExistsInternalAsync(ExistOptions opt
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result<bool>.Fail(ex);
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ protected override async Task<Result<BlobMetadata>> GetBlobMetadataInternalAsync
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result<BlobMetadata>.Fail(ex);
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ protected override async Task<Result> SetLegalHoldInternalAsync(bool hasLegalHol
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result.Fail(ex);
}
}
Expand All @@ -323,7 +323,7 @@ protected override async Task<Result<bool>> HasLegalHoldInternalAsync(LegalHoldO
}
catch (Exception ex)
{
_logger?.LogError(ex.Message, ex);
_logger.LogException(ex);
return Result<bool>.Fail(ex);
}
}
Expand Down
186 changes: 186 additions & 0 deletions ManagedCode.Storage.Aws/BlobStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;

namespace ManagedCode.Storage.Aws;

public class BlobStream : Stream
{
/* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't
* safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT
* is ~50TB, which is too big for S3. */
const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater
const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT
const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total
const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH;

internal class Metadata
{
public string BucketName;
public string Key;
public long PartLength = DEFAULT_PART_LENGTH;

public int PartCount = 0;
public string UploadId;
public MemoryStream CurrentStream;

public long Position = 0; // based on bytes written
public long Length = 0; // based on bytes written or SetLength, whichever is larger (no truncation)

public List<Task> Tasks = new List<Task>();
public ConcurrentDictionary<int, string> PartETags = new ConcurrentDictionary<int, string>();
}

Metadata _metadata = new Metadata();
IAmazonS3 _s3 = null;

public BlobStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH)
: this(s3, new Uri(s3uri), partLength)
{
}

public BlobStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH)
: this (s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength)
{
}

public BlobStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH)
{
_s3 = s3;
_metadata.BucketName = bucket;
_metadata.Key = key;
_metadata.PartLength = partLength;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_metadata != null)
{
Flush(true);
CompleteUpload();
}
}
_metadata = null;
base.Dispose(disposing);
}

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position);

public override long Position
{
get => _metadata.Position;
set => throw new NotImplementedException();
}

public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();

public override void SetLength(long value)
{
_metadata.Length = Math.Max(_metadata.Length, value);
_metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT));
}

private void StartNewPart()
{
if (_metadata.CurrentStream != null) {
Flush(false);
}
_metadata.CurrentStream = new MemoryStream();
_metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH));
}

public override void Flush()
{
Flush(false);
}

private void Flush(bool disposing)
{
if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) &&
!disposing)
return;

if (_metadata.UploadId == null) {
_metadata.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key
}).GetAwaiter().GetResult().UploadId;
}

if (_metadata.CurrentStream != null)
{
var i = ++_metadata.PartCount;

_metadata.CurrentStream.Seek(0, SeekOrigin.Begin);
var request = new UploadPartRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key,
UploadId = _metadata.UploadId,
PartNumber = i,
IsLastPart = disposing,
InputStream = _metadata.CurrentStream
};
_metadata.CurrentStream = null;

var upload = Task.Run(async () =>
{
var response = await _s3.UploadPartAsync(request);
_metadata.PartETags.AddOrUpdate(i, response.ETag,
(n, s) => response.ETag);
request.InputStream.Dispose();
});
_metadata.Tasks.Add(upload);
}
}

private void CompleteUpload()
{
Task.WaitAll(_metadata.Tasks.ToArray());

if (Length > 0) {
_s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key,
PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(),
UploadId = _metadata.UploadId
}).GetAwaiter().GetResult();
}
}

public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0) return;

// write as much of the buffer as will fit to the current part, and if needed
// allocate a new part and continue writing to it (and so on).
var o = offset;
var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to
do
{
if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength)
StartNewPart();

var remaining = _metadata.PartLength - _metadata.CurrentStream.Length;
var w = Math.Min(c, (int)remaining);
_metadata.CurrentStream.Write(buffer, o, w);

_metadata.Position += w;
c -= w;
o += w;
} while (c > 0);
}
}
6 changes: 3 additions & 3 deletions ManagedCode.Storage.Aws/ManagedCode.Storage.Aws.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="ManagedCode.Communication" Version="2.0.23" />
<PackageReference Include="AWSSDK.S3" Version="3.7.104.19" />
<PackageReference Include="ManagedCode.Communication" Version="2.0.26" />
<PackageReference Include="AWSSDK.S3" Version="3.7.205.10" />
<PackageReference Include="Humanizer.Core" Version="2.14.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>

Expand Down
Loading