diff --git a/build.gradle b/build.gradle index c6452ac2c4..3820c8d33f 100644 --- a/build.gradle +++ b/build.gradle @@ -263,46 +263,6 @@ project(":samza-aws_$scalaSuffix") { } } - -project(":samza-autoscaling_$scalaSuffix") { - apply plugin: 'scala' - apply plugin: 'checkstyle' - - // Force scala joint compilation - sourceSets.main.scala.srcDir "src/main/java" - sourceSets.test.scala.srcDir "src/test/java" - - // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting - // tasks.compileTestJava.enabled = false - sourceSets.main.java.srcDirs = [] - sourceSets.test.java.srcDirs = [] - - dependencies { - compile project(':samza-api') - compile project(":samza-core_$scalaSuffix") - compile "org.scala-lang:scala-library:$scalaVersion" - compile "org.slf4j:slf4j-api:$slf4jVersion" - compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" - compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" - compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" - compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") { - exclude module: 'servlet-api' - } - compile("org.apache.hadoop:hadoop-common:$yarnVersion") { - exclude module: 'servlet-api' - } - compile "org.apache.httpcomponents:httpclient:$httpClientVersion" - testCompile "junit:junit:$junitVersion" - testCompile "org.mockito:mockito-core:$mockitoVersion" - testCompile "org.scalatest:scalatest_$scalaSuffix:$scalaTestVersion" - } - - checkstyle { - configFile = new File(rootDir, "checkstyle/checkstyle.xml") - toolVersion = "$checkstyleVersion" - } -} - project(":samza-elasticsearch_$scalaSuffix") { apply plugin: 'java' diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java deleted file mode 100644 index 1d319d676a..0000000000 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.samza.autoscaling.deployer; - -import joptsimple.OptionSet; - -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.samza.autoscaling.utils.YarnUtil; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.container.SamzaContainer; -import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.coordinator.stream.messages.SetConfig; -import org.apache.samza.job.JobRunner; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.metrics.MetricsRegistryMap; -import org.apache.samza.system.SystemStreamPartitionIterator; -import org.apache.samza.util.CommandLine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - - -/** - * This class is a separate module that runs along side with a job, and handles all config changes submitted to a job after the bootstrap of the job. - * All config changes are written to the coordinator stream using the @Link{CoordinatorStreamWriter}. - * The way this class works is that it reads all messages with type "set-config" written to the coordinator stream after - * the bootstrap of the job, and it handles the messages accordingly. - * The current configuration changes it handles are - * 1. changing the number of containers of a job - * 2. setting the server url for the first time (in order to get the JobModel). - * In order to use this class the run() method should be called to react to changes, - * or call the start(), processConfigMessages(), and stop() function instead. - * Additionally, you have to add the following configurations to the config file: - * yarn.rm.address=localhost //the ip of the resource manager in yarn - * yarn.rm.port=8088 //the port of the resource manager http server - * Additionally, the config manger will periodically poll the coordinator stream to see if there are any new messages. - * This period is set to 100 ms by default. However, it can be configured by adding the following property to the input config file. - * configManager.polling.interval=< polling interval > - */ - -public class ConfigManager { - private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer; - private SystemStreamPartitionIterator coordinatorStreamIterator; - private static final Logger log = LoggerFactory.getLogger(ConfigManager.class); - private final long defaultPollingInterval = 100; - private final int defaultReadJobModelDelayMs = 100; - private final long interval; - private String coordinatorServerURL = null; - private final String jobName; - private final int jobID; - private Config config; - - private YarnUtil yarnUtil; - - private final String rmAddressOpt = "yarn.rm.address"; - private final String rmPortOpt = "yarn.rm.port"; - private final String pollingIntervalOpt = "configManager.polling.interval"; - private static final String SERVER_URL_OPT = "samza.autoscaling.server.url"; - private static final String YARN_CONTAINER_COUNT_OPT = "yarn.container.count"; - - public ConfigManager(Config config) { - - //get rm address and port - if (!config.containsKey(rmAddressOpt) || !config.containsKey(rmPortOpt)) { - throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port."); - } - String rmAddress = config.get(rmAddressOpt); - int rmPort = config.getInt(rmPortOpt); - - //get job name and id; - if (!config.containsKey(JobConfig.JOB_NAME)) { - throw new IllegalArgumentException("Missing config: the config does not contain the job name"); - } - jobName = config.get(JobConfig.JOB_NAME); - jobID = config.getInt(JobConfig.JOB_ID, 1); - - //set polling interval - if (config.containsKey(pollingIntervalOpt)) { - long pollingInterval = config.getLong(pollingIntervalOpt); - if (pollingInterval <= 0) { - throw new IllegalArgumentException("polling interval should be greater than 0"); - } - this.interval = pollingInterval; - } else { - this.interval = defaultPollingInterval; - } - - this.config = config; - this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); - this.yarnUtil = new YarnUtil(rmAddress, rmPort); - } - - /** - * This method is an infinite loop that periodically checks if there are any new messages in the job coordinator stream, and reads them if they exist. - * Then it reacts accordingly based on the configuration that is being set. - * The method the calls the start() method to initialized the system, runs in a infinite loop, and calls the stop() method at the end to stop the consumer and the system - */ - private void run() { - start(); - try { - while (true) { - Thread.sleep(interval); - processConfigMessages(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - log.warn("Got interrupt in config manager thread, so shutting down"); - Thread.currentThread().interrupt(); - } finally { - log.info("Stopping the config manager"); - stop(); - } - } - - /** - * Starts the system by starting the consumer - */ - private void start() { - register(); - coordinatorStreamConsumer.start(); - coordinatorStreamIterator = coordinatorStreamConsumer.getStartIterator(); - bootstrap(); - } - - /** - * stops the consumer making the system ready to stop - */ - private void stop() { - coordinatorStreamConsumer.stop(); - coordinatorServerURL = null; - yarnUtil.stop(); - } - - /** - * registers the consumer - */ - private void register() { - coordinatorStreamConsumer.register(); - } - - - /** - * This function will bootstrap by reading all the unread messages until the moment of calling the function, and therefore find the server url. - */ - private void bootstrap() { - List keysToProcess = new LinkedList<>(); - keysToProcess.add(SERVER_URL_OPT); - processConfigMessages(keysToProcess); - if (coordinatorServerURL == null) { - throw new IllegalStateException("coordinator server url is null, while the bootstrap has finished "); - } - log.info("Config manager bootstrapped"); - } - - /** - * notAValidEvent all the unread messages up to the time this function is called. - * This method just reads the messages, and it does not react to them or change any configuration of the system. - */ - private void skipUnreadMessages() { - processConfigMessages(Collections.emptyList()); - log.info("Config manager skipped messages"); - } - - /** - * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked - */ - private void processConfigMessages() { - List keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT); - processConfigMessages(keysToProcess); - } - - /** - * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked - * - * @param keysToProcess a list of keys to process. Only messages with these keys will call their handler function, - * and other messages will be skipped. If the list is empty all messages will be skipped. - */ - @SuppressWarnings("unchecked") - private void processConfigMessages(List keysToProcess) { - if (!coordinatorStreamConsumer.hasNewMessages(coordinatorStreamIterator)) { - return; - } - if (keysToProcess == null) { - throw new IllegalArgumentException("The keys to process list is null"); - } - for (CoordinatorStreamMessage message : coordinatorStreamConsumer.getUnreadMessages(coordinatorStreamIterator, SetConfig.TYPE)) { - String key = null; - try { - SetConfig setConfigMessage = new SetConfig(message); - key = setConfigMessage.getKey(); - Map valuesMap = (Map) setConfigMessage.getMessageMap().get("values"); - String value = null; - if (valuesMap != null) { - value = valuesMap.get("value"); - } - - log.debug("Received set-config message with key: " + key + " and value: " + value); - - if (keysToProcess.contains(key)) { - if (key.equals(YARN_CONTAINER_COUNT_OPT)) { - handleYarnContainerChange(value); - } else if (key.equals(SERVER_URL_OPT)) { - handleServerURLChange(value); - } else { - log.info("Setting the " + key + " configuration is currently not supported, skipping the message"); - } - } - - //TODO: change the handlers to implement a common interface, to make them pluggable - } catch (Exception e) { - log.error("Error in reading a message, skipping message with key " + key); - } - - } - - } - - /** - * This method handle setConfig messages that want to change the url of the server the JobCoordinator has brought up. - * - * @param newServerURL the new value of the server URL - */ - private void handleServerURLChange(String newServerURL) { - this.coordinatorServerURL = newServerURL; - log.info("Server URL being set to " + newServerURL); - } - - /** - * This method handles setConfig messages that want to change the number of containers of a job - * - * @param containerCountAsString the new number of containers in a String format - */ - private void handleYarnContainerChange(String containerCountAsString) throws IOException, YarnException { - String applicationId = yarnUtil.getRunningAppId(jobName, jobID); - - int containerCount = Integer.valueOf(containerCountAsString); - - //checking the input is valid - int currentNumTask = getCurrentNumTasks(); - int currentNumContainers = getCurrentNumContainers(); - if (containerCount == currentNumContainers) { - log.error("The new number of containers is equal to the current number of containers, skipping this message"); - return; - } - if (containerCount <= 0) { - log.error("The number of containers cannot be zero or less, skipping this message"); - return; - } - - - if (containerCount > currentNumTask) { - log.error("The number of containers cannot be more than the number of task, skipping this message"); - return; - } - - - //killing the current job - log.info("Killing the current job"); - yarnUtil.killApplication(applicationId); - //reset the global variables - coordinatorServerURL = null; - - - try { - //waiting for the job to be killed - String state = yarnUtil.getApplicationState(applicationId); - Thread.sleep(1000); - int countSleep = 1; - - while (!state.equals("KILLED")) { - state = yarnUtil.getApplicationState(applicationId); - log.info("Job kill signal sent, but job not killed yet for " + applicationId + ". Sleeping for another 1000ms"); - Thread.sleep(1000); - countSleep++; - if (countSleep > 10) { - throw new IllegalStateException("Job has not been killed after 10 attempts."); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } - - log.info("Killed the current job successfully"); - - //start the job again - log.info("Staring the job again"); - skipUnreadMessages(); - JobRunner jobRunner = new JobRunner(config); - jobRunner.run(false); - } - - - /** - * This method returns the number of tasks in the job. It works by querying the server, and getting the job model. - * Then it extracts the number of tasks from the job model - * - * @return current number of tasks in the job - */ - private int getCurrentNumTasks() { - int currentNumTasks = 0; - for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) { - currentNumTasks += containerModel.getTasks().size(); - } - return currentNumTasks; - } - - /** - * This method returns the number of containers in the job. It works by querying the server, and getting the job model. - * Then it extracts the number of containers from the job model - * - * @return current number of containers in the job - */ - private int getCurrentNumContainers() { - return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size(); - } - - - /** - * Gets the current value of the server URL that the job coordinator is serving the job model on. - * - * @return the current server URL. If null, it means the job has not set the server yet. - */ - public String getCoordinatorServerURL() { - return coordinatorServerURL; - } - - /** - * Main function for using the Config Manager. The main function starts a Config Manager, and reacts to all messages thereafter - * In order for this module to run, you have to add the following configurations to the config file: - * yarn.rm.address=localhost //the ip of the resource manager in yarn - * yarn.rm.port=8088 //the port of the resource manager http server - * Additionally, the config manger will periodically poll the coordinator stream to see if there are any new messages. - * This period is set to 100 ms by default. However, it can be configured by adding the following property to the input config file. - * configManager.polling.interval= < polling interval > - * To run the code use the following command: - * {path to samza deployment}/samza/bin/run-config-manager.sh --config-factory={config-factory} --config-path={path to config file of a job} - * - * @param args input arguments for running ConfigManager. - */ - public static void main(String[] args) { - CommandLine cmdline = new CommandLine(); - OptionSet options = cmdline.parser().parse(args); - Config config = cmdline.loadConfig(options); - ConfigManager configManager = new ConfigManager(config); - configManager.run(); - } - - -} diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java deleted file mode 100644 index 7331f61c75..0000000000 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.autoscaling.utils; - -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * This is a helper class to interact with yarn. Some of the functionalities it provides are killing an application, - * getting the state of an application, getting an application id given the job name and job id. - */ -public class YarnUtil { - private static final Logger log = LoggerFactory.getLogger(YarnUtil.class); - private final CloseableHttpClient httpClient; - private final HttpHost rmServer; - private final YarnClient yarnClient; - - public YarnUtil(String rmAddress, int rmPort) { - this.httpClient = HttpClientBuilder.create().build(); - this.rmServer = new HttpHost(rmAddress, rmPort, "http"); - log.info("setting rm server to : " + rmServer); - YarnConfiguration hConfig = new YarnConfiguration(); - hConfig.set(YarnConfiguration.RM_ADDRESS, rmAddress + ":" + YarnConfiguration.DEFAULT_RM_PORT); - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(hConfig); - yarnClient.start(); - } - - /** - * Queries rm for all the applications currently running and finds the application with the matching job name and id - * - * @param jobName the name of the job - * @param jobID the job id - * @return the application id of the job running in yarn. If application id is not found, it will return null. - */ - public String getRunningAppId(String jobName, int jobID) { - - try { - HttpGet getRequest = new HttpGet("/ws/v1/cluster/apps"); - HttpResponse httpResponse = httpClient.execute(rmServer, getRequest); - String applications = EntityUtils.toString(httpResponse.getEntity()); - log.debug("applications: " + applications); - - List> applicationList = parseYarnApplications(applications); - String name = jobName + "_" + jobID; - for (Map application : applicationList) { - if ("RUNNING".equals(application.get("state")) && name.equals(application.get("name")) && application.containsKey("id")) { - return application.get("id"); - } - } - } catch (NullPointerException | IOException e) { - e.printStackTrace(); - throw new IllegalStateException("there is no valid application id for the given job name and job id. job name: " + jobName + " job id: " + jobID); - } - - return null; - } - - List> parseYarnApplications(String applications) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - Map>>> yarnApplications = mapper.readValue(applications, new TypeReference>>>>() { - }); - return yarnApplications.get("apps").get("app"); - } - - /** - * This function returns the state of a given application. This state can be on of the - * {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"} - * - * @param applicationId the application id of the application the state is being queried - * @return the state of the application which is one of the following values: {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"} - * @throws IOException Throws IO exception - * @throws YarnException in case of errors or if YARN rejects the request due to - * access-control restrictions. - */ - public String getApplicationState(String applicationId) throws IOException, YarnException { - - return yarnClient.getApplicationReport(getApplicationIDFromString(applicationId)).getYarnApplicationState().toString(); - - } - - /** - * This function kills an application given the applicationId - * - * @param applicationId the application Id of the job to be killed - * @throws IOException Throws IO exception - * @throws YarnException in case of errors or if YARN rejects the request due to - * access-control restrictions. - */ - public void killApplication(String applicationId) throws IOException, YarnException { - - log.info("killing job with application id: " + applicationId); - - yarnClient.killApplication(getApplicationIDFromString(applicationId)); - } - - /** - * This function converts an application in form of a String into a {@link ApplicationId} - * - * @param appIDStr The application id in form of a string - * @return the application id as an instance of ApplicationId class. - */ - private ApplicationId getApplicationIDFromString(String appIDStr) { - String[] parts = appIDStr.split("_"); - if (parts.length < 3) { - throw new IllegalStateException("the application id found is not valid. application id: " + appIDStr); - } - long timestamp = Long.valueOf(parts[1]); - int id = Integer.valueOf(parts[2]); - return ApplicationId.newInstance(timestamp, id); - } - - /** - * This function stops the YarnUtil by stopping the yarn client and http client. - */ - public void stop() { - try { - httpClient.close(); - } catch (IOException e) { - log.error("HTTP Client failed to close.", e); - } - yarnClient.stop(); - } - -} diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java deleted file mode 100644 index 7b4b74ea90..0000000000 --- a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.samza.autoscaling.utils; - -import org.apache.commons.io.IOUtils; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class YarnUtilTest { - - @Test - public void handleJsonArraysAsWellAsStrings() throws IOException { - YarnUtil yarnUtil = new YarnUtil("rm", 0); - List> applications = yarnUtil.parseYarnApplications(IOUtils.toString(getClass().getClassLoader().getResourceAsStream("exampleResourceManagerOutput.json"))); - assertEquals("RUNNING", applications.get(0).get("state")); - } -} diff --git a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json deleted file mode 100644 index 9f8a02519a..0000000000 --- a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json +++ /dev/null @@ -1,121 +0,0 @@ -{ - "apps": { - "app": [ - { - "id": "application_1459790549146_0003", - "user": "root", - "name": "protodeserializer_1", - "queue": "default", - "state": "RUNNING", - "finalStatus": "UNDEFINED", - "progress": 0, - "trackingUI": "ApplicationMaster", - "trackingUrl": "http://yarnrm:8088/proxy/application_1459790549146_0003/", - "diagnostics": "", - "clusterId": 1459790549146, - "applicationType": "Samza", - "applicationTags": "", - "startedTime": 1459792852675, - "finishedTime": 0, - "elapsedTime": 738921, - "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0003_01_000001/root", - "amHostHttpAddress": "yarnnm:8042", - "allocatedMB": 1024, - "allocatedVCores": 2, - "runningContainers": 2, - "memorySeconds": 749045, - "vcoreSeconds": 1462, - "preemptedResourceMB": 0, - "preemptedResourceVCores": 0, - "numNonAMContainerPreempted": 0, - "numAMContainerPreempted": 0, - "resourceRequests": [ - { - "capability": { - "memory": 512, - "virtualCores": 1 - }, - "nodeLabelExpression": "", - "numContainers": 0, - "priority": { - "priority": 0 - }, - "relaxLocality": true, - "resourceName": "*" - }, - { - "capability": { - "memory": 512, - "virtualCores": 1 - }, - "nodeLabelExpression": "", - "numContainers": 0, - "priority": { - "priority": 0 - }, - "relaxLocality": true, - "resourceName": "/default-rack" - } - ] - }, - { - "id": "application_1459790549146_0002", - "user": "root", - "name": "protodeserializer_1", - "queue": "default", - "state": "KILLED", - "finalStatus": "KILLED", - "progress": 100, - "trackingUI": "History", - "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0002", - "diagnostics": "Application killed by user.", - "clusterId": 1459790549146, - "applicationType": "Samza", - "applicationTags": "", - "startedTime": 1459791820396, - "finishedTime": 1459792284264, - "elapsedTime": 463868, - "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0002_01_000001/root", - "amHostHttpAddress": "yarnnm:8042", - "allocatedMB": -1, - "allocatedVCores": -1, - "runningContainers": -1, - "memorySeconds": 462177, - "vcoreSeconds": 902, - "preemptedResourceMB": 0, - "preemptedResourceVCores": 0, - "numNonAMContainerPreempted": 0, - "numAMContainerPreempted": 0 - }, - { - "id": "application_1459790549146_0001", - "user": "root", - "name": "protodeserializer_1", - "queue": "default", - "state": "KILLED", - "finalStatus": "KILLED", - "progress": 100, - "trackingUI": "History", - "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0001", - "diagnostics": "Application killed by user.", - "clusterId": 1459790549146, - "applicationType": "Samza", - "applicationTags": "", - "startedTime": 1459791108916, - "finishedTime": 1459791813659, - "elapsedTime": 704743, - "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0001_01_000001/root", - "amHostHttpAddress": "yarnnm:8042", - "allocatedMB": -1, - "allocatedVCores": -1, - "runningContainers": -1, - "memorySeconds": 711605, - "vcoreSeconds": 1389, - "preemptedResourceMB": 0, - "preemptedResourceVCores": 0, - "numNonAMContainerPreempted": 0, - "numAMContainerPreempted": 0 - } - ] - } -} \ No newline at end of file diff --git a/samza-shell/src/main/bash/run-config-manager.sh b/samza-shell/src/main/bash/run-config-manager.sh deleted file mode 100755 index 96777e7105..0000000000 --- a/samza-shell/src/main/bash/run-config-manager.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -if [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j2.*.jar*") ]]; then - [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$(dirname $0)/log4j2-console.xml" -elif [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j.*.jar*") ]]; then - [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" -fi - -exec $(dirname $0)/run-class.sh org.apache.samza.autoscaling.deployer.ConfigManager "$@" diff --git a/settings.gradle b/settings.gradle index c636706d71..cf4c9be7a3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,7 +23,6 @@ include \ 'samza-shell' def scalaModules = [ - 'samza-autoscaling', 'samza-aws', 'samza-azure', 'samza-core', diff --git a/sonar-project.properties b/sonar-project.properties index 6e420dad57..2e9c7bed16 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -32,4 +32,4 @@ sonar.tests=src/test sonar.jacoco.reportPaths=build/jacoco/test.exec # List of subprojects here -sonar.modules=samza-api,samza-autoscaling,samza-azure,samza-core,samza-elasticsearch,samza-hdfs,samza-kafka,samza-kv-inmemory,samza-kv-rocksdb,samza-kv-couchbase,samza-kv,samza-log4j,samza-rest,samza-shell,samza-test,samza-yarn +sonar.modules=samza-api,samza-azure,samza-core,samza-elasticsearch,samza-hdfs,samza-kafka,samza-kv-inmemory,samza-kv-rocksdb,samza-kv-couchbase,samza-kv,samza-log4j,samza-rest,samza-shell,samza-test,samza-yarn