diff --git a/src/main/scala/com/async2databricks/s3/S3Writer.scala b/src/main/scala/com/async2databricks/s3/S3Writer.scala index 20ef778..2cf3ed0 100644 --- a/src/main/scala/com/async2databricks/s3/S3Writer.scala +++ b/src/main/scala/com/async2databricks/s3/S3Writer.scala @@ -33,18 +33,18 @@ object S3Writer extends CatsLogger { def createS3Client[F[_]: Async](config: S3Config): Resource[F, S3Client] = { val log = logger[F] Resource.make { - Async[F].delay { - val credentialsProvider = StaticCredentialsProvider.create( - AwsBasicCredentials.create(config.accessKey, config.secretKey) - ) + for { + client <- Async[F].delay { + val credentialsProvider = StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.accessKey, config.secretKey) + ) - val builder = S3Client - .builder() - .credentialsProvider(credentialsProvider) - .region(Region.of(config.region)) + val builder = S3Client + .builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.region)) - // Use custom endpoint for LocalStack - val client = + // Use custom endpoint for LocalStack if ( config.endpoint.nonEmpty && config.endpoint != "https://s3.amazonaws.com" ) { @@ -52,13 +52,9 @@ object S3Writer extends CatsLogger { } else { builder.build() } - - // Log effectfully - log.info( - s"S3 client created for endpoint: ${config.endpoint}" - ) // Only for side-effect, not recommended in prod - client - } + } + _ <- log.info(s"S3 client created for endpoint: ${config.endpoint}") + } yield client }(client => Async[F].delay(client.close())) } @@ -69,23 +65,32 @@ object S3Writer extends CatsLogger { bucketName: String ): F[Unit] = { val log = logger[F] - Async[F].delay { - try { - s3Client.headBucket( - HeadBucketRequest.builder().bucket(bucketName).build() - ) - log.info( - s"Bucket $bucketName already exists" - ) // Only for side-effect, not recommended in prod - } catch { - case _: NoSuchBucketException => - log.info(s"Creating bucket $bucketName") - s3Client.createBucket( - CreateBucketRequest.builder().bucket(bucketName).build() + Async[F] + .delay { + try { + s3Client.headBucket( + HeadBucketRequest.builder().bucket(bucketName).build() ) - log.info(s"Bucket $bucketName created successfully") + Right(()) + } catch { + case _: NoSuchBucketException => + Left(()) + } + } + .flatMap { + case Right(_) => + log.info(s"Bucket $bucketName already exists") + case Left(_) => + for { + _ <- log.info(s"Creating bucket $bucketName") + _ <- Async[F].delay { + s3Client.createBucket( + CreateBucketRequest.builder().bucket(bucketName).build() + ) + } + _ <- log.info(s"Bucket $bucketName created successfully") + } yield () } - } } def apply[F[_]: Async](config: S3Config): Resource[F, S3Writer[F]] = {