Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,12 @@ project(":samza-azure_$scalaSuffix") {
apply plugin: 'checkstyle'

dependencies {
compile "com.azure:azure-storage-blob:12.0.1"
compile "com.microsoft.azure:azure-storage:5.3.1"
compile "com.microsoft.azure:azure-eventhubs:1.0.1"
compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
compile "io.dropwizard.metrics:metrics-core:3.1.2"
compile "org.apache.avro:avro:$avroVersion"
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
ext {
apacheCommonsCollections4Version = "4.0"
avroVersion = "1.7.1"
avroVersion = "1.7.7"
calciteVersion = "1.19.0"
commonsCliVersion = "1.2"
commonsCodecVersion = "1.9"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.system.azureblob;

import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;

/**
* This class holds all the metrics to be measured (like write, write byte, error)
* for a single group (like aggregate, system or source).
*/
public class AzureBlobBasicMetrics {
public static final String EVENT_WRITE_RATE = "eventWriteRate";
public static final String EVENT_PRODUCE_ERROR = "eventProduceError";
public static final String EVENT_WRITE_BYTE_RATE = "eventWriteByteRate";
public static final String EVENT_COMPRESS_BYTE_RATE = "eventCompressByteRate";
public static final String AZURE_BLOCK_UPLOAD_RATE = "azureBlockUploadRate";
public static final String AZURE_BLOB_COMMIT_RATE = "azureBlobCommitRate";

private final Counter writeMetrics;
private final Counter writeByteMetrics;
private final Counter errorMetrics;
private final Counter compressByteMetrics;
private final Counter azureUploadMetrics;
private final Counter azureCommitMetrics;

public AzureBlobBasicMetrics(String group, MetricsRegistry metricsRegistry) {
writeMetrics = metricsRegistry.newCounter(group, EVENT_WRITE_RATE);
errorMetrics = metricsRegistry.newCounter(group, EVENT_PRODUCE_ERROR);
writeByteMetrics = metricsRegistry.newCounter(group, EVENT_WRITE_BYTE_RATE);
compressByteMetrics = metricsRegistry.newCounter(group, EVENT_COMPRESS_BYTE_RATE);
azureUploadMetrics = metricsRegistry.newCounter(group, AZURE_BLOCK_UPLOAD_RATE);
azureCommitMetrics = metricsRegistry.newCounter(group, AZURE_BLOB_COMMIT_RATE);
}

/**
* Increments the write metrics counter by 1.
*/
public void updateWriteMetrics() {
writeMetrics.inc();
}

/**
* Increments the write byte metrics counter by the number of bytes written.
* @param dataLength number of bytes written.
*/
public void updateWriteByteMetrics(long dataLength) {
writeByteMetrics.inc(dataLength);
}

/**
* Increments the compress byte metrics counter by the number of compressed bytes written.
* @param dataLength number of bytes written.
*/
public void updateCompressByteMetrics(long dataLength) {
compressByteMetrics.inc(dataLength);
}

/**
* Increments the error metrics counter by 1.
*/
public void updateErrorMetrics() {
errorMetrics.inc();
}


/**
* Increments the azure block upload metrics counter by 1.
*/
public void updateAzureUploadMetrics() {
azureUploadMetrics.inc();
}


/**
* Increments the azure blob commit metrics counter by 1.
*/
public void updateAzureCommitMetrics() {
azureCommitMetrics.inc();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.system.azureblob;

import org.apache.samza.system.azureblob.compression.CompressionType;
import java.time.Duration;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.MapConfig;

public class AzureBlobConfig extends MapConfig {
private static final String SYSTEM_AZUREBLOB_PREFIX = "systems.%s.azureblob.";
//Server Instance Level Property

// The duration after which an Azure request will be logged as a warning.
public static final String AZURE_BLOB_LOG_SLOW_REQUESTS_MS = "samza.azureblob.log.slowRequestMs";
private static final long AZURE_BLOB_LOG_SLOW_REQUESTS_MS_DEFAULT = Duration.ofSeconds(30).toMillis();

// system Level Properties.
// fully qualified class name of the AzureBlobWriter impl for the producer system
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class";
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";

// Azure Storage Account name under which the Azure container representing this system is.
// System name = Azure container name (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
public static final String SYSTEM_AZURE_ACCOUNT_NAME = SYSTEM_AZUREBLOB_PREFIX + "account.name";

// Azure Storage Account key associated with the Azure Storage Account
public static final String SYSTEM_AZURE_ACCOUNT_KEY = SYSTEM_AZUREBLOB_PREFIX + "account.key";

// Whether to use proxy while connecting to Azure Storage
public static final String SYSTEM_AZURE_USE_PROXY = SYSTEM_AZUREBLOB_PREFIX + "proxy.use";
public static final boolean SYSTEM_AZURE_USE_PROXY_DEFAULT = false;

// name of the host to be used as proxy
public static final String SYSTEM_AZURE_PROXY_HOSTNAME = SYSTEM_AZUREBLOB_PREFIX + "proxy.hostname";

// port in the proxy host to be used
public static final String SYSTEM_AZURE_PROXY_PORT = SYSTEM_AZUREBLOB_PREFIX + "proxy.port";

// type of compression to be used before uploading blocks : “none” or “gzip”
public static final String SYSTEM_COMPRESSION_TYPE = SYSTEM_AZUREBLOB_PREFIX + "compression.type";
private static final CompressionType SYSTEM_COMPRESSION_TYPE_DEFAULT = CompressionType.NONE;

// maximum size of uncompressed block in bytes
public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;

// maximum size of uncompressed blob in bytes
public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited

// maximum number of messages in a blob
public static final String SYSTEM_MAX_MESSAGES_PER_BLOB = SYSTEM_AZUREBLOB_PREFIX + "maxMessagesPerBlob";
private static final long SYSTEM_MAX_MESSAGES_PER_BLOB_DEFAULT = Long.MAX_VALUE; // unlimited

// number of threads to asynchronously upload blocks
public static final String SYSTEM_THREAD_POOL_COUNT = SYSTEM_AZUREBLOB_PREFIX + "threadPoolCount";
private static final int SYSTEM_THREAD_POOL_COUNT_DEFAULT = 1;

// size of the queue to hold blocks ready to be uploaded by asynchronous threads.
// If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading
// which will block processing of incoming messages
// Default - Thread Pool Count * 2
public static final String SYSTEM_BLOCKING_QUEUE_SIZE = SYSTEM_AZUREBLOB_PREFIX + "blockingQueueSize";

// timeout to finish uploading all blocks before committing a blob
public static final String SYSTEM_FLUSH_TIMEOUT_MS = SYSTEM_AZUREBLOB_PREFIX + "flushTimeoutMs";
private static final long SYSTEM_FLUSH_TIMEOUT_MS_DEFAULT = Duration.ofMinutes(3).toMillis();

// timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob
public static final String SYSTEM_CLOSE_TIMEOUT_MS = SYSTEM_AZUREBLOB_PREFIX + "closeTimeoutMs";
private static final long SYSTEM_CLOSE_TIMEOUT_MS_DEFAULT = Duration.ofMinutes(5).toMillis();

// if true, a random string of 8 chars is suffixed to the blob name to prevent name collision
// when more than one Samza tasks are writing to the same SSP.
public static final String SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME = SYSTEM_AZUREBLOB_PREFIX + "suffixRandomStringToBlobName";
private static final boolean SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME_DEFAULT = true;

public AzureBlobConfig(Config config) {
super(config);
}

public String getAzureAccountKey(String systemName) {
String accountKey = get(String.format(SYSTEM_AZURE_ACCOUNT_KEY, systemName));
if (accountKey == null) {
throw new ConfigException("Azure account key is required.");
}
return accountKey;
}

public String getAzureAccountName(String systemName) {
String accountName = get(String.format(SYSTEM_AZURE_ACCOUNT_NAME, systemName));
if (accountName == null) {
throw new ConfigException("Azure account name is required.");
}
return accountName;
}

public boolean getUseProxy(String systemName) {
return getBoolean(String.format(SYSTEM_AZURE_USE_PROXY, systemName), SYSTEM_AZURE_USE_PROXY_DEFAULT);
}

public String getAzureProxyHostname(String systemName) {
String hostname = get(String.format(SYSTEM_AZURE_PROXY_HOSTNAME, systemName));
if (hostname == null) {
throw new ConfigException("Azure proxy host name is required.");
}
return hostname;
}

public int getAzureProxyPort(String systemName) {
return getInt(String.format(SYSTEM_AZURE_PROXY_PORT, systemName));
}

public CompressionType getCompressionType(String systemName) {
return CompressionType.valueOf(get(String.format(SYSTEM_COMPRESSION_TYPE, systemName), SYSTEM_COMPRESSION_TYPE_DEFAULT.name()).toUpperCase());
}

public String getAzureBlobWriterFactoryClassName(String systemName) {
return get(String.format(SYSTEM_WRITER_FACTORY_CLASS_NAME, systemName), SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT);
}

public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}

public int getAzureBlobThreadPoolCount(String systemName) {
return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
}

public int getBlockingQueueSize(String systemName) {
return getInt(String.format(SYSTEM_BLOCKING_QUEUE_SIZE, systemName), 2 * getAzureBlobThreadPoolCount(systemName));
}

public long getFlushTimeoutMs(String systemName) {
long timeout = getLong(String.format(SYSTEM_FLUSH_TIMEOUT_MS, systemName), SYSTEM_FLUSH_TIMEOUT_MS_DEFAULT);
if (timeout <= 0) {
throw new ConfigException("Azure Blob flush timeout can not be <= 0");
}
return timeout;
}

public long getCloseTimeoutMs(String systemName) {
long timeout = getLong(String.format(SYSTEM_CLOSE_TIMEOUT_MS, systemName), SYSTEM_CLOSE_TIMEOUT_MS_DEFAULT);
if (timeout <= 0) {
throw new ConfigException("Azure Blob close timeout can not be <= 0");
}
return timeout;
}

public long getLogSlowRequestsMs() {
long duration = getLong(AZURE_BLOB_LOG_SLOW_REQUESTS_MS, AZURE_BLOB_LOG_SLOW_REQUESTS_MS_DEFAULT);
if (duration <= 0) {
throw new ConfigException("Azure blob duration to log slow requests can not be <=0.");
}
return duration;
}

public boolean getSuffixRandomStringToBlobName(String systemName) {
return getBoolean(String.format(SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME, systemName), SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME_DEFAULT);
}

public long getMaxBlobSize(String systemName) {
return getLong(String.format(SYSTEM_MAX_BLOB_SIZE, systemName), SYSTEM_MAX_BLOB_SIZE_DEFAULT);
}

public long getMaxMessagesPerBlob(String systemName) {
return getLong(String.format(SYSTEM_MAX_MESSAGES_PER_BLOB, systemName), SYSTEM_MAX_MESSAGES_PER_BLOB_DEFAULT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.system.azureblob;

import java.util.Map;
import java.util.Set;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;


/**
* {@inheritDoc}
*/
public class AzureBlobSystemAdmin implements SystemAdmin {
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
throw new UnsupportedOperationException("getOffsetsAfter not supported for AzureBlobSystemAdmin");
}

public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
throw new UnsupportedOperationException("getSystemStreamMetadata not supported for AzureBlobSystemAdmin");
}

public Integer offsetComparator(String offset1, String offset2) {
throw new UnsupportedOperationException("offsetComparator not supported for AzureBlobSystemAdmin");
}
}
Loading