From 829c3c3c3e1258fd2aeac9424878f1a155df1d40 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 12 May 2023 19:29:57 +0530 Subject: [PATCH 01/12] Do not retry SQL operation in case of max_allowed_packet exception --- .../parallel/ParallelIndexPhaseRunner.java | 2 +- .../MetadataStorageConnectorConfig.java | 2 +- .../MetadataStorageConnectorConfigTest.java | 9 +- .../druid/metadata/SQLMetadataConnector.java | 24 ++-- .../metadata/SQLMetadataConnectorTest.java | 132 +++++++++--------- 5 files changed, 89 insertions(+), 80 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 4725b43d64f1..3b725388f94b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -242,7 +242,7 @@ private void submitNewTask( SubTaskSpec spec ) { - LOG.info("Submit a new task for spec[%s]", spec.getId()); + LOG.info("Submitting a new task for spec[%s]", spec.getId()); final ListenableFuture> future = taskMonitor.submit(spec); Futures.addCallback( future, diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java index 2e6767f0b77e..38f06f0b53c0 100644 --- a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java +++ b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java @@ -56,7 +56,7 @@ public static MetadataStorageConnectorConfig create( String connectUri, String user, String password, - Map properties + Map properties ) { MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); diff --git a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java index 03a10c9540ae..2d34af4928b1 100644 --- a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java +++ b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java @@ -77,8 +77,8 @@ public void testEquals() throws IOException "user", "\"nothing\"" ); - Assert.assertTrue(metadataStorageConnectorConfig.equals(metadataStorageConnectorConfig2)); - Assert.assertTrue(metadataStorageConnectorConfig.hashCode() == metadataStorageConnectorConfig2.hashCode()); + Assert.assertEquals(metadataStorageConnectorConfig, metadataStorageConnectorConfig2); + Assert.assertEquals(metadataStorageConnectorConfig.hashCode(), metadataStorageConnectorConfig2.hashCode()); } private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); @@ -193,12 +193,13 @@ private void testDbcpPropertiesFile( @Test public void testCreate() { - Map props = ImmutableMap.of("key", "value"); + Map props = ImmutableMap.of("key", "value"); MetadataStorageConnectorConfig config = MetadataStorageConnectorConfig.create( "connectURI", "user", "pwd", - props); + props + ); Assert.assertEquals("connectURI", config.getConnectURI()); Assert.assertEquals("user", config.getUser()); Assert.assertEquals("pwd", config.getPassword()); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2e24f97aa1c3..9d9910d2c733 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -59,6 +59,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector private static final Logger log = new Logger(SQLMetadataConnector.class); private static final String PAYLOAD_TYPE = "BLOB"; private static final String COLLATION = ""; + private static final String MAX_ALLOWED_PACKET_ERROR = "query size is >= to max_allowed_packet"; static final int DEFAULT_MAX_TRIES = 10; @@ -165,14 +166,21 @@ public T retryTransaction(final TransactionCallback callback, final int q public final boolean isTransientException(Throwable e) { - return e != null && (e instanceof RetryTransactionException - || e instanceof SQLTransientException - || e instanceof SQLRecoverableException - || e instanceof UnableToObtainConnectionException - || e instanceof UnableToExecuteStatementException - || connectorIsTransientException(e) - || (e instanceof SQLException && isTransientException(e.getCause())) - || (e instanceof DBIException && isTransientException(e.getCause()))); + if (e == null) { + return false; + } + if (e.getMessage() != null && e.getMessage().contains(MAX_ALLOWED_PACKET_ERROR)) { + return false; + } + + return e instanceof RetryTransactionException + || e instanceof SQLTransientException + || e instanceof SQLRecoverableException + || e instanceof UnableToObtainConnectionException + || e instanceof UnableToExecuteStatementException + || connectorIsTransientException(e) + || (e instanceof SQLException && isTransientException(e.getCause())) + || (e instanceof DBIException && isTransientException(e.getCause())); } /** diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 1c192da475d5..49df405ef759 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -19,9 +19,9 @@ package org.apache.druid.metadata; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; @@ -30,14 +30,20 @@ import org.junit.Test; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; +import java.sql.SQLRecoverableException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLTransientException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; public class SQLMetadataConnectorTest @@ -47,7 +53,6 @@ public class SQLMetadataConnectorTest private TestDerbyConnector connector; private MetadataStorageTablesConfig tablesConfig; - private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); @Before public void setUp() @@ -213,79 +218,74 @@ protected BasicDataSource getDatasource() } } - private MetadataStorageConnectorConfig getDbcpPropertiesFile( - boolean createTables, - String host, - int port, - String connectURI, - String user, - String pwdString, - String pwd - ) throws Exception - { - return JSON_MAPPER.readValue( - "{" + - "\"createTables\": \"" + createTables + "\"," + - "\"host\": \"" + host + "\"," + - "\"port\": \"" + port + "\"," + - "\"connectURI\": \"" + connectURI + "\"," + - "\"user\": \"" + user + "\"," + - "\"password\": " + pwdString + "," + - "\"dbcp\": {\n" + - " \"maxConnLifetimeMillis\" : 1200000,\n" + - " \"defaultQueryTimeout\" : \"30000\"\n" + - "}" + - "}", - MetadataStorageConnectorConfig.class - ); - } - @Test - public void testBasicDataSourceCreation() throws Exception + public void testBasicDataSourceCreation() { - MetadataStorageConnectorConfig config = getDbcpPropertiesFile( - true, - "host", - 1234, - "connectURI", - "user", - "{\"type\":\"default\",\"password\":\"nothing\"}", - "nothing" + Map props = ImmutableMap.of( + "maxConnLifetimeMillis", "1200000", + "defaultQueryTimeout", "30000" ); + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "password", props); + TestSQLMetadataConnector testSQLMetadataConnector = new TestSQLMetadataConnector( Suppliers.ofInstance(config), Suppliers.ofInstance(tablesConfig) ); BasicDataSource dataSource = testSQLMetadataConnector.getDatasource(); Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000); - Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000); + Assert.assertEquals(dataSource.getDefaultQueryTimeout().intValue(), 30000); } - private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId) + @Test + public void testIsTransientException() { - try { - return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws SQLException - { - Statement statement = handle.getConnection().createStatement(); - ResultSet resultSet = statement.executeQuery( - StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id) - ); - resultSet.next(); - boolean flag = type.equals(resultSet.getString("type")) - && groupId.equals(resultSet.getString("group_id")); - statement.close(); - return flag; - } - } - ); - } - catch (Exception e) { - e.printStackTrace(); - return false; - } + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "password", Collections.emptyMap()); + TestSQLMetadataConnector metadataConnector = new TestSQLMetadataConnector( + Suppliers.ofInstance(config), + Suppliers.ofInstance(tablesConfig) + ); + + // Transient exceptions + Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientException())); + Assert.assertTrue(metadataConnector.isTransientException(new RetryTransactionException(""))); + Assert.assertTrue(metadataConnector.isTransientException(new SQLRecoverableException())); + Assert.assertTrue(metadataConnector.isTransientException(new UnableToExecuteStatementException(""))); + Assert.assertTrue( + metadataConnector.isTransientException(new UnableToObtainConnectionException(new Exception())) + ); + + // Non transient exceptions + Assert.assertFalse(metadataConnector.isTransientException(null)); + Assert.assertFalse(metadataConnector.isTransientException(new SQLException())); + Assert.assertFalse( + metadataConnector.isTransientException(new CallbackFailedException(new Exception())) + ); + Assert.assertFalse( + metadataConnector.isTransientException( + new SQLTransientConnectionException("Could not send query: query size is >= to max_allowed_packet") + ) + ); + + // Nested transient exceptions + Assert.assertTrue( + metadataConnector.isTransientException( + new CallbackFailedException(new SQLTransientException()) + ) + ); + + // Nested non-transient exceptions + Assert.assertFalse( + metadataConnector.isTransientException( + new CallbackFailedException( + new UnableToExecuteStatementException( + new SQLTransientConnectionException( + "Could not send query: query size is >= to max_allowed_packet" + ) + ) + ) + ) + ); } } From 05e91182b5dd68f7ed9577a4b866192e0c436b9e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 23 May 2023 10:15:16 +0530 Subject: [PATCH 02/12] Add method SQLMetadataConnector.connectorIsNonTransientException --- .../storage/mysql/MySQLConnector.java | 23 ++++- .../storage/mysql/MySQLConnectorTest.java | 8 +- .../druid/metadata/SQLMetadataConnector.java | 50 ++++++++--- .../metadata/SQLMetadataConnectorTest.java | 88 ++++++++----------- 4 files changed, 99 insertions(+), 70 deletions(-) diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java index 4098e30a5441..47da9f31f796 100644 --- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java +++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java @@ -46,7 +46,10 @@ public class MySQLConnector extends SQLMetadataConnector private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT"; private static final String QUOTE_STRING = "`"; private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin"; - private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME = "com.mysql.jdbc.exceptions.MySQLTransientException"; + private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME + = "com.mysql.jdbc.exceptions.MySQLTransientException"; + private static final String MARIA_DB_PACKET_EXCEPTION_CLASS_NAME + = "org.mariadb.jdbc.internal.util.exceptions.MaxAllowedPacketException"; @Nullable private final Class myTransientExceptionClass; @@ -218,6 +221,24 @@ protected boolean connectorIsTransientException(Throwable e) return false; } + @Override + protected boolean connectorIsNonTransientException(Throwable e) + { + // Check only for packet exception as other non-transient exceptions would + // already be filtered out + return isRootCausePacketException(e); + } + + private boolean isRootCausePacketException(Throwable t) + { + if (t == null) { + return false; + } + + return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(t.getClass().getName()) + || isRootCausePacketException(t.getCause()); + } + @Override public Void insertOrUpdate( final String tableName, diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java index 23ce46282232..52a6d2187ae7 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java @@ -59,10 +59,10 @@ public void testIsExceptionTransientMySql() Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransientException())); Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransactionRollbackException())); Assert.assertTrue( - connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317)) + connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317)) ); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337)) + connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337)) ); // this method does not specially handle normal transient exceptions either, since it is not vendor specific Assert.assertFalse( @@ -82,10 +82,10 @@ public void testIsExceptionTransientNoMySqlClazz() // no vendor specific for MariaDb, so should always be false Assert.assertFalse(connector.connectorIsTransientException(new MySQLTransientException())); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317)) + connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317)) ); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337)) + connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337)) ); Assert.assertFalse( connector.connectorIsTransientException(new SQLTransientConnectionException("transient")) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 9d9910d2c733..36196dc2cc73 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -59,8 +59,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector private static final Logger log = new Logger(SQLMetadataConnector.class); private static final String PAYLOAD_TYPE = "BLOB"; private static final String COLLATION = ""; - private static final String MAX_ALLOWED_PACKET_ERROR = "query size is >= to max_allowed_packet"; - static final int DEFAULT_MAX_TRIES = 10; private final Supplier config; @@ -164,33 +162,62 @@ public T retryTransaction(final TransactionCallback callback, final int q } } + /** + * Returns true for errors that are definitely transient. An error can be + * confirmed to be transient iff: + *
    + *
  • Either: {@link #connectorIsTransientException(Throwable)} returns true
  • + *
  • Or: {@link #couldBeTransientException(Throwable)} returns true and + * {@link #connectorIsNonTransientException(Throwable)} returns false.
  • + *
+ */ public final boolean isTransientException(Throwable e) + { + return connectorIsTransientException(e) + || (couldBeTransientException(e) && !connectorIsNonTransientException(e)); + } + + /** + * Checks if the given error could potentially be transient. + * An error for which this method returns true is then tested against + * {@link #connectorIsNonTransientException(Throwable)} for confirmation. + */ + private boolean couldBeTransientException(Throwable e) { if (e == null) { return false; } - if (e.getMessage() != null && e.getMessage().contains(MAX_ALLOWED_PACKET_ERROR)) { - return false; - } return e instanceof RetryTransactionException || e instanceof SQLTransientException || e instanceof SQLRecoverableException || e instanceof UnableToObtainConnectionException - || e instanceof UnableToExecuteStatementException - || connectorIsTransientException(e) + || (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause())) || (e instanceof SQLException && isTransientException(e.getCause())) || (e instanceof DBIException && isTransientException(e.getCause())); } /** - * Vendor specific errors that are not covered by {@link #isTransientException(Throwable)} + * Vendor specific errors that are not covered by {@link #couldBeTransientException(Throwable)}. + * This method takes precedence over {@link #connectorIsNonTransientException(Throwable)} + * i.e. an error for which both of these methods return true is categorized + * as transient. */ protected boolean connectorIsTransientException(Throwable e) { return false; } + /** + * Returns true if a given vendor specific error is definitely not transient. + * This method is called only for errors that are potentially transient, + * i.e. errors for which {@link #couldBeTransientException(Throwable)} returns true. + */ + protected boolean connectorIsNonTransientException(Throwable e) + { + return false; + } + public void createTable(final String tableName, final Iterable sql) { try { @@ -344,12 +371,7 @@ public boolean tableContainsColumn(Handle handle, String table, String column) { try { DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); - ResultSet columns = databaseMetaData.getColumns( - null, - null, - table, - column - ); + ResultSet columns = databaseMetaData.getColumns(null, null, table, column); return columns.next(); } catch (SQLException e) { diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 49df405ef759..d9df47920da2 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -33,7 +33,6 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.sql.SQLException; import java.sql.SQLRecoverableException; @@ -83,28 +82,23 @@ public void testCreateTables() connector.createSupervisorsTable(); connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - for (String table : tables) { - Assert.assertTrue( - StringUtils.format("table %s was not created!", table), - connector.tableExists(handle, table) - ); - } - - String taskTable = tablesConfig.getTasksTable(); - for (String column : Arrays.asList("type", "group_id")) { - Assert.assertTrue( - StringUtils.format("Tasks table column %s was not created!", column), - connector.tableContainsColumn(handle, taskTable, column) - ); - } - - return null; + handle -> { + for (String table : tables) { + Assert.assertTrue( + StringUtils.format("table %s was not created!", table), + connector.tableExists(handle, table) + ); } + + String taskTable = tablesConfig.getTasksTable(); + for (String column : Arrays.asList("type", "group_id")) { + Assert.assertTrue( + StringUtils.format("Tasks table column %s was not created!", column), + connector.tableContainsColumn(handle, taskTable, column) + ); + } + + return null; } ); @@ -152,16 +146,8 @@ public void testInsertOrUpdate() private void dropTable(final String tableName) { connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) - .execute(); - return null; - } - } + handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) + .execute() ); } @@ -248,44 +234,44 @@ public void testIsTransientException() ); // Transient exceptions - Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientException())); Assert.assertTrue(metadataConnector.isTransientException(new RetryTransactionException(""))); Assert.assertTrue(metadataConnector.isTransientException(new SQLRecoverableException())); - Assert.assertTrue(metadataConnector.isTransientException(new UnableToExecuteStatementException(""))); - Assert.assertTrue( - metadataConnector.isTransientException(new UnableToObtainConnectionException(new Exception())) - ); + Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientException())); + Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientConnectionException())); // Non transient exceptions Assert.assertFalse(metadataConnector.isTransientException(null)); Assert.assertFalse(metadataConnector.isTransientException(new SQLException())); - Assert.assertFalse( - metadataConnector.isTransientException(new CallbackFailedException(new Exception())) + Assert.assertFalse(metadataConnector.isTransientException(new UnableToExecuteStatementException(""))); + + // Nested transient exceptions + Assert.assertTrue( + metadataConnector.isTransientException( + new CallbackFailedException(new SQLTransientException()) + ) ); - Assert.assertFalse( + Assert.assertTrue( metadataConnector.isTransientException( - new SQLTransientConnectionException("Could not send query: query size is >= to max_allowed_packet") + new UnableToObtainConnectionException(new SQLException()) ) ); - - // Nested transient exceptions Assert.assertTrue( metadataConnector.isTransientException( - new CallbackFailedException(new SQLTransientException()) + new UnableToExecuteStatementException(new SQLTransientException()) ) ); // Nested non-transient exceptions Assert.assertFalse( metadataConnector.isTransientException( - new CallbackFailedException( - new UnableToExecuteStatementException( - new SQLTransientConnectionException( - "Could not send query: query size is >= to max_allowed_packet" - ) - ) - ) + new CallbackFailedException(new SQLException()) + ) + ); + Assert.assertFalse( + metadataConnector.isTransientException( + new UnableToExecuteStatementException(new SQLException()) ) ); + } } From 03aac042920369bbc4ac74dbe0701a110ebc5e81 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 May 2023 17:12:21 +0530 Subject: [PATCH 03/12] Add DruidException with response codes --- .../storage/mysql/MySQLConnector.java | 17 ++--- .../storage/mysql/MySQLConnectorTest.java | 28 +++++++ .../overlord/HeapMemoryTaskStorage.java | 2 +- .../overlord/MetadataTaskStorage.java | 55 ++++++-------- .../overlord/http/OverlordResource.java | 34 ++++----- .../common/exception/DruidException.java | 76 +++++++++++++++++++ .../druid/metadata/EntryExistsException.java | 17 +++-- .../druid/metadata/SQLMetadataConnector.java | 53 ++++--------- .../SQLMetadataStorageActionHandler.java | 62 +++++++++++---- 9 files changed, 223 insertions(+), 121 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/common/exception/DruidException.java diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java index 47da9f31f796..91abb44380fa 100644 --- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java +++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java @@ -50,6 +50,8 @@ public class MySQLConnector extends SQLMetadataConnector = "com.mysql.jdbc.exceptions.MySQLTransientException"; private static final String MARIA_DB_PACKET_EXCEPTION_CLASS_NAME = "org.mariadb.jdbc.internal.util.exceptions.MaxAllowedPacketException"; + private static final String MYSQL_PACKET_EXCEPTION_CLASS_NAME + = "com.mysql.jdbc.PacketTooBigException"; @Nullable private final Class myTransientExceptionClass; @@ -222,21 +224,16 @@ protected boolean connectorIsTransientException(Throwable e) } @Override - protected boolean connectorIsNonTransientException(Throwable e) - { - // Check only for packet exception as other non-transient exceptions would - // already be filtered out - return isRootCausePacketException(e); - } - - private boolean isRootCausePacketException(Throwable t) + protected boolean isRootCausePacketTooBigException(Throwable t) { if (t == null) { return false; } - return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(t.getClass().getName()) - || isRootCausePacketException(t.getCause()); + final String className = t.getClass().getName(); + return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(className) + || MYSQL_PACKET_EXCEPTION_CLASS_NAME.equals(className) + || isRootCausePacketTooBigException(t.getCause()); } @Override diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java index 52a6d2187ae7..514d01b6a59a 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java @@ -29,6 +29,7 @@ import java.sql.SQLException; import java.sql.SQLTransientConnectionException; +import java.sql.SQLTransientException; public class MySQLConnectorTest { @@ -92,6 +93,33 @@ public void testIsExceptionTransientNoMySqlClazz() ); } + @Test + public void testIsRootCausePacketTooBigException() + { + MySQLConnector connector = new MySQLConnector( + CONNECTOR_CONFIG_SUPPLIER, + TABLES_CONFIG_SUPPLIER, + new MySQLConnectorSslConfig(), + MYSQL_DRIVER_CONFIG + ); + + // The test method should return true only for + // mariadb.MaxAllowedPacketException or mysql.PacketTooBigException. + // Verifying this requires creating a mock Class object, but Class is final + // and has only a private constructor. It would be overkill to try to mock it. + + // Verify some of the false cases + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new SQLException()) + ); + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new SQLTransientException()) + ); + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new MySQLTransientException()) + ); + } + @Test public void testLimitClause() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 8ef5970982ad..efbd3ef181f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -90,7 +90,7 @@ public void insert(Task task, TaskStatus status) throws EntryExistsException TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()); TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff); if (alreadyExisted != null) { - throw new EntryExistsException(task.getId()); + throw new EntryExistsException("Task", task.getId()); } log.info("Inserted task %s with status: %s", task.getId(), status); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 7c7fbd3f53c8..0855bd56f9b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -20,13 +20,13 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.inject.Inject; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; @@ -139,7 +139,14 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE status.getId() ); - log.info("Inserting task %s with status: %s", task.getId(), status); + /*if (task.getType().contains("merge")) { + log.info("Kashif bloating up task [%s]", task.getId()); + for (int i = 0; i < 10000; ++i) { + task.getContext().put("key_" + i, "value_" + i); + } + }*/ + + log.info("Inserting task [%s] with status [%s].", task.getId(), status); try { handler.insert( @@ -153,12 +160,11 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE task.getGroupId() ); } + catch (DruidException e) { + throw e; + } catch (Exception e) { - if (e instanceof EntryExistsException) { - throw (EntryExistsException) e; - } else { - throw new RuntimeException(e); - } + throw new RuntimeException(e); } } @@ -167,15 +173,12 @@ public void setStatus(final TaskStatus status) { Preconditions.checkNotNull(status, "status"); - log.info("Updating task %s to status: %s", status.getId(), status); + final String taskId = status.getId(); + log.info("Updating status of task [%s] to [%s].", taskId, status); - final boolean set = handler.setStatus( - status.getId(), - status.isRunnable(), - status - ); + final boolean set = handler.setStatus(taskId, status.isRunnable(), status); if (!set) { - throw new ISE("Active task not found: %s", status.getId()); + throw new ISE("No active task for id [%s]", taskId); } } @@ -279,10 +282,8 @@ public void addLock(final String taskid, final TaskLock taskLock) Preconditions.checkNotNull(taskLock, "taskLock"); log.info( - "Adding lock on interval[%s] version[%s] for task: %s", - taskLock.getInterval(), - taskLock.getVersion(), - taskid + "Adding lock on interval[%s] version[%s] for task [%s].", + taskLock.getInterval(), taskLock.getVersion(), taskid ); handler.addLock(taskid, taskLock); @@ -296,15 +297,13 @@ public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock) Preconditions.checkNotNull(newLock, "newLock"); log.info( - "Replacing an existing lock[%s] with a new lock[%s] for task: %s", - oldLock, - newLock, - taskid + "Replacing an existing lock[%s] with a new lock[%s] for task [%s].", + oldLock, newLock, taskid ); final Long oldLockId = handler.getLockId(taskid, oldLock); if (oldLockId == null) { - throw new ISE("Cannot find an existing lock[%s]", oldLock); + throw new ISE("Cannot find lock[%s] for task [%s]", oldLock, taskid); } handler.replaceLock(taskid, oldLockId, newLock); @@ -336,14 +335,8 @@ public List getLocks(String taskid) { return ImmutableList.copyOf( Iterables.transform( - getLocksWithIds(taskid).entrySet(), new Function, TaskLock>() - { - @Override - public TaskLock apply(Map.Entry e) - { - return e.getValue(); - } - } + getLocksWithIds(taskid).entrySet(), + Entry::getValue ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index a64b67b3b834..5b130585c40c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -36,6 +36,7 @@ import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; @@ -64,7 +65,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -222,25 +222,19 @@ public Response taskPost(final Task task, @Context final HttpServletRequest req) return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - try { - taskQueue.add(task); - return Response.ok(ImmutableMap.of("task", task.getId())).build(); - } - catch (EntryExistsException e) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("Task[%s] already exists!", task.getId()) - ) - ) - .build(); - } + taskQueue -> { + try { + taskQueue.add(task); + return Response.ok(ImmutableMap.of("task", task.getId())).build(); + } + catch (DruidException e) { + return Response.status(e.getResponseCode()) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + catch (Exception e) { + log.info("Got final exception [%s]", e.getClass()); + throw e; } } ); diff --git a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java new file mode 100644 index 000000000000..905ce11449d5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.exception; + +/** + * A generic exception thrown by Druid. + */ +public class DruidException extends RuntimeException +{ + public static final int DEFAULT_HTTP_CODE = 500; + public static final int HTTP_CODE_BAD_REQUEST = 400; + + private static final boolean DEFAULT_CAN_RETRY = false; + + private final int responseCode; + private final boolean isTransient; + + public DruidException(String message) + { + this(message, DEFAULT_HTTP_CODE, null, DEFAULT_CAN_RETRY); + } + + public DruidException(String message, Throwable cause) + { + this(message, DEFAULT_HTTP_CODE, cause, DEFAULT_CAN_RETRY); + } + + public DruidException(String message, int responseCode) + { + this(message, responseCode, null, DEFAULT_CAN_RETRY); + } + + public DruidException(String message, int responseCode, Throwable cause) + { + this(message, responseCode, cause, false); + } + + public DruidException(String message, int responseCode, Throwable cause, boolean isTransient) + { + super(message, cause); + this.responseCode = responseCode; + this.isTransient = isTransient; + } + + public int getResponseCode() + { + return responseCode; + } + + /** + * Returns true if this is a transient exception and might go away if the + * operation is retried. All DruidException are non-transient by default + * unless specified otherwise. + */ + public boolean isTransient() + { + return isTransient; + } +} diff --git a/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java b/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java index 76d71de217b1..f8a7d860d53a 100644 --- a/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java +++ b/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java @@ -19,19 +19,26 @@ package org.apache.druid.metadata; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.java.util.common.StringUtils; -public class EntryExistsException extends Exception +/** + * A non-transient Druid metadata exception thrown when trying to insert a + * duplicate entry in the metadata. + */ +public class EntryExistsException extends DruidException { - public EntryExistsException(String entryId, Throwable t) + private static final int HTTP_BAD_REQUEST = 400; + + public EntryExistsException(String entryType, String entryId) { - super(StringUtils.format("Entry already exists: %s", entryId), t); + this(entryType, entryId, null); } - public EntryExistsException(String entryId) + public EntryExistsException(String entryType, String entryId, Throwable t) { - this(entryId, null); + super(StringUtils.format("%s [%s] already exists.", entryType, entryId), HTTP_BAD_REQUEST, t, false); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 36196dc2cc73..2a20c8f72214 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -162,46 +162,20 @@ public T retryTransaction(final TransactionCallback callback, final int q } } - /** - * Returns true for errors that are definitely transient. An error can be - * confirmed to be transient iff: - *
    - *
  • Either: {@link #connectorIsTransientException(Throwable)} returns true
  • - *
  • Or: {@link #couldBeTransientException(Throwable)} returns true and - * {@link #connectorIsNonTransientException(Throwable)} returns false.
  • - *
- */ public final boolean isTransientException(Throwable e) { - return connectorIsTransientException(e) - || (couldBeTransientException(e) && !connectorIsNonTransientException(e)); + return e != null && (e instanceof RetryTransactionException + || e instanceof SQLTransientException + || e instanceof SQLRecoverableException + || e instanceof UnableToObtainConnectionException + || (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause())) + || connectorIsTransientException(e) + || (e instanceof SQLException && isTransientException(e.getCause())) + || (e instanceof DBIException && isTransientException(e.getCause()))); } /** - * Checks if the given error could potentially be transient. - * An error for which this method returns true is then tested against - * {@link #connectorIsNonTransientException(Throwable)} for confirmation. - */ - private boolean couldBeTransientException(Throwable e) - { - if (e == null) { - return false; - } - - return e instanceof RetryTransactionException - || e instanceof SQLTransientException - || e instanceof SQLRecoverableException - || e instanceof UnableToObtainConnectionException - || (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause())) - || (e instanceof SQLException && isTransientException(e.getCause())) - || (e instanceof DBIException && isTransientException(e.getCause())); - } - - /** - * Vendor specific errors that are not covered by {@link #couldBeTransientException(Throwable)}. - * This method takes precedence over {@link #connectorIsNonTransientException(Throwable)} - * i.e. an error for which both of these methods return true is categorized - * as transient. + * Vendor specific errors that are not covered by {@link #isTransientException(Throwable)}. */ protected boolean connectorIsTransientException(Throwable e) { @@ -209,11 +183,12 @@ protected boolean connectorIsTransientException(Throwable e) } /** - * Returns true if a given vendor specific error is definitely not transient. - * This method is called only for errors that are potentially transient, - * i.e. errors for which {@link #couldBeTransientException(Throwable)} returns true. + * Checks if the root cause of the given exception is a PacketTooBigException. + * + * @return false by default. Specific implementations should override this method + * to correctly classify their packet exceptions. */ - protected boolean connectorIsNonTransientException(Throwable e) + protected boolean isRootCausePacketTooBigException(Throwable t) { return false; } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 436af7ad694a..24af19471979 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -28,6 +28,7 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.java.util.common.DateTimes; @@ -164,8 +165,9 @@ public void insert( ) throws EntryExistsException { try { - getConnector().retryWithHandle( - (HandleCallback) handle -> { + executeWithRetryForTask( + id, + handle -> { final String sql = StringUtils.format( "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", @@ -182,17 +184,12 @@ public void insert( .bind("status_payload", jsonMapper.writeValueAsBytes(status)) .execute(); return null; - }, - e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent()) + } ); } catch (Exception e) { - if (isStatementException(e) && getEntry(id).isPresent()) { - throw new EntryExistsException(id, e); - } else { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); } } @@ -304,10 +301,7 @@ public List> getTaskInfos( final Query> query; switch (entry.getKey()) { case ACTIVE: - query = createActiveTaskStreamingQuery( - handle, - dataSource - ); + query = createActiveTaskStreamingQuery(handle, dataSource); tasks.addAll(query.map(taskInfoMapper).list()); break; case COMPLETE: @@ -390,6 +384,44 @@ List> getTaskStatusList( ); } + /** + * Executes the given callback for the specified taskId. If the execution fails, + * the exception thrown is always a DruidException. If the thrown DruidException + * is found to be transient, the execution is retried. + */ + private void executeWithRetryForTask(String taskId, HandleCallback callback) + { + connector.retryWithHandle( + handle -> { + try { + return callback.withHandle(handle); + } + catch (Throwable t) { + if (isStatementException(t) && getEntry(taskId).isPresent()) { + throw new EntryExistsException("Task", taskId); + } else if (connector.isRootCausePacketTooBigException(t)) { + throw new DruidException( + StringUtils.format( + "Payload for task [%s] exceeds the packet limit." + + " Update the max_allowed_packet on your metadata store" + + " server or in the connection properties.", + taskId + ), + DruidException.HTTP_CODE_BAD_REQUEST, + t + ); + } else { + throw new DruidException( + StringUtils.format("Encountered metadata exception for task [%s]", taskId), + t + ); + } + } + }, + e -> e instanceof DruidException && ((DruidException) e).isTransient() + ); + } + /** * Fetches the columns needed to build TaskStatusPlus for completed tasks * Please note that this requires completion of data migration to avoid empty values for task type and groupId @@ -1016,7 +1048,7 @@ public void populateTaskTypeAndGroupIdAsync() { ExecutorService executorService = Executors.newSingleThreadExecutor(); taskMigrationCompleteFuture = executorService.submit( - () -> populateTaskTypeAndGroupId() + this::populateTaskTypeAndGroupId ); } From a4f754b4c98f25dba3499b8ea09df3bc62a7eda6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 May 2023 17:15:40 +0530 Subject: [PATCH 04/12] Remove test changes --- .../druid/indexing/overlord/MetadataTaskStorage.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 0855bd56f9b3..2735802b2d70 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -139,13 +139,6 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE status.getId() ); - /*if (task.getType().contains("merge")) { - log.info("Kashif bloating up task [%s]", task.getId()); - for (int i = 0; i < 10000; ++i) { - task.getContext().put("key_" + i, "value_" + i); - } - }*/ - log.info("Inserting task [%s] with status [%s].", task.getId(), status); try { From 49eaee1f90e698f8b37c33e635d8be441672e64c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 May 2023 17:28:16 +0530 Subject: [PATCH 05/12] Fix usage of EntryExistsException --- .../main/java/org/apache/druid/indexing/overlord/TaskQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index c508876f0ce4..0beccce3dd03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -482,7 +482,7 @@ public boolean add(final Task task) throws EntryExistsException IdUtils.validateId("Task ID", task.getId()); if (taskStorage.getTask(task.getId()).isPresent()) { - throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId())); + throw new EntryExistsException("Task", task.getId()); } // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. From c6253b6b3ed4933cf59c84c7fb1aaa6a4a3dfddd Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 May 2023 22:07:18 +0530 Subject: [PATCH 06/12] Fix unit tests --- .../SQLMetadataStorageActionHandler.java | 137 +++++++++++------- 1 file changed, 82 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 85ce50343ef7..50701669b72e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -165,31 +165,62 @@ public void insert( ) throws EntryExistsException { try { - executeWithRetryForTask( - id, - handle -> { - final String sql = StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " - + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", - getEntryTable() - ); - handle.createStatement(sql) - .bind("id", id) - .bind("created_date", timestamp.toString()) - .bind("datasource", dataSource) - .bind("payload", jsonMapper.writeValueAsBytes(entry)) - .bind("type", type) - .bind("group_id", groupId) - .bind("active", active) - .bind("status_payload", jsonMapper.writeValueAsBytes(status)) - .execute(); - return null; - } + getConnector().retryWithHandle( + handle -> insertEntryWithHandle(handle, id, timestamp, dataSource, entry, active, status, type, groupId), + this::isTransientException ); } + catch (CallbackFailedException e) { + propageError(e.getCause()); + } catch (Exception e) { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); + propageError(e); + } + } + + private void propageError(Throwable t) + { + Throwables.propagateIfPossible(t); + throw new RuntimeException(t); + } + + /** + * Inserts the given entry into the metadata store. Any exception thrown by a + * HandleCallback is wrapped in a {@link DruidException} wrapped in a + * {@link CallbackFailedException}. + */ + private int insertEntryWithHandle( + Handle handle, + String entryId, + DateTime timestamp, + String dataSource, + EntryType entry, + boolean active, + StatusType status, + String type, + String groupId + ) + { + try { + final String sql = StringUtils.format( + "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", + getEntryTable() + ); + handle.createStatement(sql) + .bind("id", entryId) + .bind("created_date", timestamp.toString()) + .bind("datasource", dataSource) + .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("type", type) + .bind("group_id", groupId) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute(); + return 1; + } + catch (Throwable t) { + throw wrapInDruidException(entryId, t); } } @@ -199,6 +230,17 @@ public static boolean isStatementException(Throwable e) (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); } + private boolean isTransientException(Throwable t) + { + if (t instanceof CallbackFailedException) { + return isTransientException(t.getCause()); + } else if (t instanceof DruidException) { + return ((DruidException) t).isTransient(); + } + + return false; + } + @Override public boolean setStatus(final String entryId, final boolean active, final StatusType status) { @@ -385,41 +427,26 @@ List> getTaskStatusList( } /** - * Executes the given callback for the specified taskId. If the execution fails, - * the exception thrown is always a DruidException. If the thrown DruidException - * is found to be transient, the execution is retried. + * Wraps the given error in a user friendly DruidException. */ - private void executeWithRetryForTask(String taskId, HandleCallback callback) + private DruidException wrapInDruidException(String taskId, Throwable t) { - connector.retryWithHandle( - handle -> { - try { - return callback.withHandle(handle); - } - catch (Throwable t) { - if (isStatementException(t) && getEntry(taskId).isPresent()) { - throw new EntryExistsException("Task", taskId); - } else if (connector.isRootCausePacketTooBigException(t)) { - throw new DruidException( - StringUtils.format( - "Payload for task [%s] exceeds the packet limit." - + " Update the max_allowed_packet on your metadata store" - + " server or in the connection properties.", - taskId - ), - DruidException.HTTP_CODE_BAD_REQUEST, - t - ); - } else { - throw new DruidException( - StringUtils.format("Encountered metadata exception for task [%s]", taskId), - t - ); - } - } - }, - e -> e instanceof DruidException && ((DruidException) e).isTransient() - ); + if (isStatementException(t) && getEntry(taskId).isPresent()) { + return new EntryExistsException("Task", taskId); + } else if (connector.isRootCausePacketTooBigException(t)) { + return new DruidException( + StringUtils.format( + "Payload for task [%s] exceeds the packet limit." + + " Update the max_allowed_packet on your metadata store" + + " server or in the connection properties.", + taskId + ), + DruidException.HTTP_CODE_BAD_REQUEST, + t + ); + } else { + return new DruidException(StringUtils.format("Encountered metadata exception for task [%s]", taskId), t); + } } /** From 52079847cc63c2330858d18be59cc9483c235a2a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 30 May 2023 08:36:27 +0530 Subject: [PATCH 07/12] Add tests for new exceptions --- .../supervisor/KinesisSupervisorTest.java | 2 +- .../indexing/overlord/TaskLockboxTest.java | 10 ++--- .../SeekableStreamSupervisorStateTest.java | 20 +++++----- .../common/exception/DruidException.java | 10 ----- .../common/exception/DruidExceptionTest.java | 39 ++++++++++++++++++ .../metadata/EntryExistsExceptionTest.java | 40 +++++++++++++++++++ .../SQLMetadataStorageActionHandlerTest.java | 8 ++-- 7 files changed, 99 insertions(+), 30 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java create mode 100644 processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 258f80ebeb6b..19caa96ead94 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3465,7 +3465,7 @@ public void testCheckpointForUnknownTaskGroup() } @Test - public void testSuspendedNoRunningTasks() throws Exception + public void testSuspendedNoRunningTasks() { supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 15e59227c272..f8560ada6d2b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -427,7 +427,7 @@ public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException } @Test - public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception + public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() { // ensure that if we don't know how to deserialize a task it won't explode the lockbox // (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things) @@ -1201,7 +1201,7 @@ public void testGetLockedIntervals() } @Test - public void testGetLockedIntervalsForLowPriorityTask() throws Exception + public void testGetLockedIntervalsForLowPriorityTask() { // Acquire lock for a low priority task final Task lowPriorityTask = NoopTask.create(5); @@ -1221,7 +1221,7 @@ public void testGetLockedIntervalsForLowPriorityTask() throws Exception } @Test - public void testGetLockedIntervalsForEqualPriorityTask() throws Exception + public void testGetLockedIntervalsForEqualPriorityTask() { // Acquire lock for a low priority task final Task task = NoopTask.create(5); @@ -1663,7 +1663,7 @@ public void testGetLockedIntervalsForRevokedLocks() throws Exception } @Test - public void testFailedToReacquireTaskLock() throws Exception + public void testFailedToReacquireTaskLock() { // Tasks to be failed have a group id with the substring "FailingLockAcquisition" // Please refer to NullLockPosseTaskLockbox @@ -1785,7 +1785,7 @@ public void expectActiveLocks(TaskLock... locks) } } - private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) throws Exception + private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) { final Task task = NoopTask.create(priority); tasks.add(task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d622984b0bab..b8d5a556eef6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -139,7 +139,7 @@ public void setupTest() taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class); spec = createMock(SeekableStreamSupervisorSpec.class); indexTaskClient = createMock(SeekableStreamIndexTaskClient.class); - recordSupplier = (RecordSupplier) createMock(RecordSupplier.class); + recordSupplier = createMock(RecordSupplier.class); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -175,7 +175,7 @@ public void setupTest() } @Test - public void testRunning() throws Exception + public void testRunning() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -214,7 +214,7 @@ public void testRunning() throws Exception } @Test - public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception + public void testRunningStreamGetSequenceNumberReturnsNull() { EasyMock.reset(recordSupplier); EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes(); @@ -266,7 +266,7 @@ public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception } @Test - public void testConnectingToStreamFail() throws Exception + public void testConnectingToStreamFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)) @@ -321,7 +321,7 @@ public void testConnectingToStreamFail() throws Exception } @Test - public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception + public void testConnectingToStreamFailRecoveryFailRecovery() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)) @@ -395,7 +395,7 @@ public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception } @Test - public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception + public void testDiscoveringInitialTasksFailRecoveryFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -560,7 +560,7 @@ public Duration getEmitterPeriod() } @Test - public void testCreatingTasksFailRecoveryFail() throws Exception + public void testCreatingTasksFailRecoveryFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -638,7 +638,7 @@ public void testCreatingTasksFailRecoveryFail() throws Exception } @Test - public void testSuspended() throws Exception + public void testSuspended() { EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -677,7 +677,7 @@ public void testSuspended() throws Exception } @Test - public void testStopping() throws Exception + public void testStopping() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -718,7 +718,7 @@ public void testStopping() throws Exception } @Test - public void testStoppingGracefully() throws Exception + public void testStoppingGracefully() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); diff --git a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java index 905ce11449d5..97aeba507ec6 100644 --- a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java +++ b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java @@ -32,21 +32,11 @@ public class DruidException extends RuntimeException private final int responseCode; private final boolean isTransient; - public DruidException(String message) - { - this(message, DEFAULT_HTTP_CODE, null, DEFAULT_CAN_RETRY); - } - public DruidException(String message, Throwable cause) { this(message, DEFAULT_HTTP_CODE, cause, DEFAULT_CAN_RETRY); } - public DruidException(String message, int responseCode) - { - this(message, responseCode, null, DEFAULT_CAN_RETRY); - } - public DruidException(String message, int responseCode, Throwable cause) { this(message, responseCode, cause, false); diff --git a/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java new file mode 100644 index 000000000000..e81d29f35b84 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.exception; + +import org.junit.Assert; +import org.junit.Test; + +public class DruidExceptionTest +{ + @Test + public void testExceptionMessageAndResponseCode() + { + DruidException exception = Assert.assertThrows( + DruidException.class, + () -> { + throw new DruidException("an error has occurred", 401, null); + } + ); + Assert.assertEquals("an error has occurred", exception.getMessage()); + Assert.assertEquals(401, exception.getResponseCode()); + } +} diff --git a/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java b/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java new file mode 100644 index 000000000000..68b8a39b0d45 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import org.apache.druid.common.exception.DruidException; +import org.junit.Assert; +import org.junit.Test; + +public class EntryExistsExceptionTest +{ + @Test + public void testExceptionMessageAndResponseCode() + { + EntryExistsException exception = Assert.assertThrows( + EntryExistsException.class, + () -> { + throw new EntryExistsException("task", "100"); + } + ); + Assert.assertEquals("task [100] already exists.", exception.getMessage()); + Assert.assertEquals(DruidException.HTTP_CODE_BAD_REQUEST, exception.getResponseCode()); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index a7c19ff978bc..7b9b41add6ad 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -127,7 +127,7 @@ public TypeReference> getLockType() } @Test - public void testEntryAndStatus() throws Exception + public void testEntryAndStatus() { Map entry = ImmutableMap.of("numericId", 1234); Map status1 = ImmutableMap.of("count", 42); @@ -256,7 +256,7 @@ public void testGetRecentStatuses2() throws EntryExistsException } @Test(timeout = 60_000L) - public void testRepeatInsert() throws Exception + public void testRepeatInsert() { final String entryId = "abcd"; Map entry = ImmutableMap.of("a", 1); @@ -269,7 +269,7 @@ public void testRepeatInsert() throws Exception } @Test - public void testLogs() throws Exception + public void testLogs() { final String entryId = "abcd"; Map entry = ImmutableMap.of("a", 1); @@ -404,7 +404,7 @@ public void testGetLockId() throws EntryExistsException } @Test - public void testRemoveTasksOlderThan() throws Exception + public void testRemoveTasksOlderThan() { final String entryId1 = "1234"; Map entry1 = ImmutableMap.of("numericId", 1234); From 983c5a2b0c80f12f1d32c5c10a332891e9bee7aa Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 30 May 2023 11:59:54 +0530 Subject: [PATCH 08/12] Address inspections --- .../druid/indexing/overlord/TaskLockboxTest.java | 6 +++--- .../apache/druid/metadata/SQLMetadataConnector.java | 3 ++- .../metadata/SQLMetadataStorageActionHandler.java | 12 ++++++------ .../SQLMetadataStorageActionHandlerTest.java | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index f8560ada6d2b..b04d20e7dd79 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1619,7 +1619,7 @@ public void testReplaceLockCanRevokeAllIncompatible() throws Exception } @Test - public void testGetLockedIntervalsForRevokedLocks() throws Exception + public void testGetLockedIntervalsForRevokedLocks() { // Acquire lock for a low priority task final Task lowPriorityTask = NoopTask.create(5); @@ -1744,7 +1744,7 @@ private class TaskLockboxValidator this.taskStorage = taskStorage; } - public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) throws Exception + public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) { final TaskLock lock = tryTaskLock(type, interval, priority); Assert.assertNotNull(lock); @@ -1757,7 +1757,7 @@ public void revokeLock(TaskLock lock) lockbox.revokeLock(lockToTaskIdMap.get(lock), lock); } - public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) throws Exception + public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) { final TaskLock lock = tryTaskLock(type, interval, priority); Assert.assertNull(lock); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2a20c8f72214..108128e09c2f 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -59,6 +59,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector private static final Logger log = new Logger(SQLMetadataConnector.class); private static final String PAYLOAD_TYPE = "BLOB"; private static final String COLLATION = ""; + static final int DEFAULT_MAX_TRIES = 10; private final Supplier config; @@ -175,7 +176,7 @@ public final boolean isTransientException(Throwable e) } /** - * Vendor specific errors that are not covered by {@link #isTransientException(Throwable)}. + * Vendor specific errors that are not covered by {@link #isTransientException(Throwable)} */ protected boolean connectorIsTransientException(Throwable e) { diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 50701669b72e..b3d5729cfcf9 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -167,18 +167,18 @@ public void insert( try { getConnector().retryWithHandle( handle -> insertEntryWithHandle(handle, id, timestamp, dataSource, entry, active, status, type, groupId), - this::isTransientException + this::isTransientDruidException ); } catch (CallbackFailedException e) { - propageError(e.getCause()); + propageAsRuntimeException(e.getCause()); } catch (Exception e) { - propageError(e); + propageAsRuntimeException(e); } } - private void propageError(Throwable t) + private void propageAsRuntimeException(Throwable t) { Throwables.propagateIfPossible(t); throw new RuntimeException(t); @@ -230,10 +230,10 @@ public static boolean isStatementException(Throwable e) (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); } - private boolean isTransientException(Throwable t) + private boolean isTransientDruidException(Throwable t) { if (t instanceof CallbackFailedException) { - return isTransientException(t.getCause()); + return isTransientDruidException(t.getCause()); } else if (t instanceof DruidException) { return ((DruidException) t).isTransient(); } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 7b9b41add6ad..2ade4f960196 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -301,7 +301,7 @@ public void testLogs() @Test - public void testLocks() throws Exception + public void testLocks() { final String entryId = "ABC123"; Map entry = ImmutableMap.of("a", 1); From 5f8f205439e3780012e2cc81c583dd89f32b00e6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 30 May 2023 13:38:05 +0530 Subject: [PATCH 09/12] More inspection fixes --- .../indexing/overlord/TaskLockboxTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index b04d20e7dd79..c9f9d72beaae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -149,7 +149,7 @@ private LockResult tryTimeChunkLock(TaskLockType lockType, Task task, Interval i } @Test - public void testLock() throws Exception + public void testLock() { validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -176,7 +176,7 @@ public void testLockAfterTaskComplete() throws InterruptedException } @Test - public void testTrySharedLock() throws Exception + public void testTrySharedLock() { final Interval interval = Intervals.of("2017-01/2017-02"); @@ -1245,7 +1245,7 @@ public void testGetLockedIntervalsForEqualPriorityTask() } @Test - public void testExclusiveLockCompatibility() throws Exception + public void testExclusiveLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -1282,7 +1282,7 @@ public void testExclusiveLockCompatibility() throws Exception } @Test - public void testExclusiveLockCanRevokeAllIncompatible() throws Exception + public void testExclusiveLockCanRevokeAllIncompatible() { final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage); @@ -1325,7 +1325,7 @@ public void testExclusiveLockCanRevokeAllIncompatible() throws Exception } @Test - public void testSharedLockCompatibility() throws Exception + public void testSharedLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.SHARED, @@ -1374,7 +1374,7 @@ public void testSharedLockCompatibility() throws Exception } @Test - public void testSharedLockCanRevokeAllIncompatible() throws Exception + public void testSharedLockCanRevokeAllIncompatible() { final TaskLock exclusiveLock = validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -1415,7 +1415,7 @@ public void testSharedLockCanRevokeAllIncompatible() throws Exception } @Test - public void testAppendLockCompatibility() throws Exception + public void testAppendLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.APPEND, @@ -1473,7 +1473,7 @@ public void testAppendLockCompatibility() throws Exception } @Test - public void testAppendLockCanRevokeAllIncompatible() throws Exception + public void testAppendLockCanRevokeAllIncompatible() { final TaskLock sharedLock = validator.expectLockCreated( TaskLockType.SHARED, @@ -1521,7 +1521,7 @@ public void testAppendLockCanRevokeAllIncompatible() throws Exception @Test - public void testReplaceLockCompatibility() throws Exception + public void testReplaceLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.REPLACE, @@ -1566,7 +1566,7 @@ public void testReplaceLockCompatibility() throws Exception } @Test - public void testReplaceLockCanRevokeAllIncompatible() throws Exception + public void testReplaceLockCanRevokeAllIncompatible() { final TaskLock appendLock0 = validator.expectLockCreated( TaskLockType.APPEND, @@ -1704,7 +1704,7 @@ public void testFailedToReacquireTaskLock() } @Test - public void testConflictsWithOverlappingSharedLocks() throws Exception + public void testConflictsWithOverlappingSharedLocks() { TaskLock conflictingLock = validator.expectLockCreated( TaskLockType.SHARED, From 26aed0c9babd919f54d6f053af9150b881eddcdf Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 2 Jun 2023 19:33:00 +0530 Subject: [PATCH 10/12] Fixes from feedback --- .../indexing/overlord/http/OverlordResource.java | 4 ---- .../SQLMetadataStorageActionHandler.java | 16 ++++++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 5b130585c40c..c6de461f1ff4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -232,10 +232,6 @@ public Response taskPost(final Task task, @Context final HttpServletRequest req) .entity(ImmutableMap.of("error", e.getMessage())) .build(); } - catch (Exception e) { - log.info("Got final exception [%s]", e.getClass()); - throw e; - } } ); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index b3d5729cfcf9..840351a922d6 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -171,25 +171,25 @@ public void insert( ); } catch (CallbackFailedException e) { - propageAsRuntimeException(e.getCause()); + propagateAsRuntimeException(e.getCause()); } catch (Exception e) { - propageAsRuntimeException(e); + propagateAsRuntimeException(e); } } - private void propageAsRuntimeException(Throwable t) + private void propagateAsRuntimeException(Throwable t) { Throwables.propagateIfPossible(t); throw new RuntimeException(t); } /** - * Inserts the given entry into the metadata store. Any exception thrown by a - * HandleCallback is wrapped in a {@link DruidException} wrapped in a - * {@link CallbackFailedException}. + * Inserts the given entry into the metadata store. This method wraps any + * exception thrown in a {@link DruidException}. When used in a HandleCallback, + * that exception is further wrapped in a {@link CallbackFailedException}. */ - private int insertEntryWithHandle( + private Void insertEntryWithHandle( Handle handle, String entryId, DateTime timestamp, @@ -217,7 +217,7 @@ private int insertEntryWithHandle( .bind("active", active) .bind("status_payload", jsonMapper.writeValueAsBytes(status)) .execute(); - return 1; + return null; } catch (Throwable t) { throw wrapInDruidException(entryId, t); From 069f4e43a50dbf173ee9f3371575f3be23f580fd Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 7 Jun 2023 16:05:57 +0530 Subject: [PATCH 11/12] Fix method isTransientDruidException --- .../apache/druid/common/exception/DruidException.java | 9 +-------- .../metadata/SQLMetadataStorageActionHandler.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java index 97aeba507ec6..1d0c935b02a1 100644 --- a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java +++ b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java @@ -24,19 +24,12 @@ */ public class DruidException extends RuntimeException { - public static final int DEFAULT_HTTP_CODE = 500; + public static final int HTTP_CODE_SERVER_ERROR = 500; public static final int HTTP_CODE_BAD_REQUEST = 400; - private static final boolean DEFAULT_CAN_RETRY = false; - private final int responseCode; private final boolean isTransient; - public DruidException(String message, Throwable cause) - { - this(message, DEFAULT_HTTP_CODE, cause, DEFAULT_CAN_RETRY); - } - public DruidException(String message, int responseCode, Throwable cause) { this(message, responseCode, cause, false); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 840351a922d6..059cd153eb85 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -236,9 +236,9 @@ private boolean isTransientDruidException(Throwable t) return isTransientDruidException(t.getCause()); } else if (t instanceof DruidException) { return ((DruidException) t).isTransient(); + } else { + return getConnector().isTransientException(t); } - - return false; } @Override @@ -445,7 +445,12 @@ private DruidException wrapInDruidException(String taskId, Throwable t) t ); } else { - return new DruidException(StringUtils.format("Encountered metadata exception for task [%s]", taskId), t); + return new DruidException( + StringUtils.format("Encountered metadata exception for task [%s]", taskId), + DruidException.HTTP_CODE_SERVER_ERROR, + t, + connector.isTransientException(t) + ); } } From 5bfce05856f01fd81ea10acfa639992feb3be510 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 10 Jun 2023 08:42:56 +0530 Subject: [PATCH 12/12] Remove unused DruidException constructor --- .../org/apache/druid/common/exception/DruidException.java | 8 +------- .../apache/druid/common/exception/DruidExceptionTest.java | 3 ++- .../metadata/MetadataStorageConnectorConfigTest.java | 8 ++------ .../druid/metadata/SQLMetadataStorageActionHandler.java | 3 ++- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java index 1d0c935b02a1..638653fc5cee 100644 --- a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java +++ b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java @@ -30,11 +30,6 @@ public class DruidException extends RuntimeException private final int responseCode; private final boolean isTransient; - public DruidException(String message, int responseCode, Throwable cause) - { - this(message, responseCode, cause, false); - } - public DruidException(String message, int responseCode, Throwable cause, boolean isTransient) { super(message, cause); @@ -49,8 +44,7 @@ public int getResponseCode() /** * Returns true if this is a transient exception and might go away if the - * operation is retried. All DruidException are non-transient by default - * unless specified otherwise. + * operation is retried. */ public boolean isTransient() { diff --git a/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java index e81d29f35b84..9f0d53930e55 100644 --- a/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java +++ b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java @@ -30,10 +30,11 @@ public void testExceptionMessageAndResponseCode() DruidException exception = Assert.assertThrows( DruidException.class, () -> { - throw new DruidException("an error has occurred", 401, null); + throw new DruidException("an error has occurred", 401, null, true); } ); Assert.assertEquals("an error has occurred", exception.getMessage()); Assert.assertEquals(401, exception.getResponseCode()); + Assert.assertTrue(exception.isTransient()); } } diff --git a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java index 2d34af4928b1..31c3ad33bcc5 100644 --- a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java +++ b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java @@ -194,12 +194,8 @@ private void testDbcpPropertiesFile( public void testCreate() { Map props = ImmutableMap.of("key", "value"); - MetadataStorageConnectorConfig config = MetadataStorageConnectorConfig.create( - "connectURI", - "user", - "pwd", - props - ); + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "pwd", props); Assert.assertEquals("connectURI", config.getConnectURI()); Assert.assertEquals("user", config.getUser()); Assert.assertEquals("pwd", config.getPassword()); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 059cd153eb85..8d84432ff1ef 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -442,7 +442,8 @@ private DruidException wrapInDruidException(String taskId, Throwable t) taskId ), DruidException.HTTP_CODE_BAD_REQUEST, - t + t, + false ); } else { return new DruidException(