Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3465,7 +3465,7 @@ public void testCheckpointForUnknownTaskGroup()
}

@Test
public void testSuspendedNoRunningTasks() throws Exception
public void testSuspendedNoRunningTasks()
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public class MySQLConnector extends SQLMetadataConnector
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private static final String QUOTE_STRING = "`";
private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME = "com.mysql.jdbc.exceptions.MySQLTransientException";
private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME
= "com.mysql.jdbc.exceptions.MySQLTransientException";
private static final String MARIA_DB_PACKET_EXCEPTION_CLASS_NAME
= "org.mariadb.jdbc.internal.util.exceptions.MaxAllowedPacketException";
private static final String MYSQL_PACKET_EXCEPTION_CLASS_NAME
= "com.mysql.jdbc.PacketTooBigException";

@Nullable
private final Class<?> myTransientExceptionClass;
Expand Down Expand Up @@ -218,6 +223,19 @@ protected boolean connectorIsTransientException(Throwable e)
return false;
}

@Override
protected boolean isRootCausePacketTooBigException(Throwable t)
{
if (t == null) {
return false;
}

final String className = t.getClass().getName();
return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(className)
|| MYSQL_PACKET_EXCEPTION_CLASS_NAME.equals(className)
|| isRootCausePacketTooBigException(t.getCause());
}

@Override
public Void insertOrUpdate(
final String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.sql.SQLException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLTransientException;

public class MySQLConnectorTest
{
Expand Down Expand Up @@ -59,10 +60,10 @@ public void testIsExceptionTransientMySql()
Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransientException()));
Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransactionRollbackException()));
Assert.assertTrue(
connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317))
connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337))
connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337))
);
// this method does not specially handle normal transient exceptions either, since it is not vendor specific
Assert.assertFalse(
Expand All @@ -82,16 +83,43 @@ public void testIsExceptionTransientNoMySqlClazz()
// no vendor specific for MariaDb, so should always be false
Assert.assertFalse(connector.connectorIsTransientException(new MySQLTransientException()));
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317))
connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337))
connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLTransientConnectionException("transient"))
);
}

@Test
public void testIsRootCausePacketTooBigException()
{
MySQLConnector connector = new MySQLConnector(
CONNECTOR_CONFIG_SUPPLIER,
TABLES_CONFIG_SUPPLIER,
new MySQLConnectorSslConfig(),
MYSQL_DRIVER_CONFIG
);

// The test method should return true only for
// mariadb.MaxAllowedPacketException or mysql.PacketTooBigException.
// Verifying this requires creating a mock Class object, but Class is final
// and has only a private constructor. It would be overkill to try to mock it.

// Verify some of the false cases
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new SQLException())
);
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new SQLTransientException())
);
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new MySQLTransientException())
);
}

@Test
public void testLimitClause()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void submitNewTask(
SubTaskSpec<SubTaskType> spec
)
{
LOG.info("Submit a new task for spec[%s]", spec.getId());
LOG.info("Submitting a new task for spec[%s]", spec.getId());
final ListenableFuture<SubTaskCompleteEvent<SubTaskType>> future = taskMonitor.submit(spec);
Futures.addCallback(
future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void insert(Task task, TaskStatus status) throws EntryExistsException
TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource());
TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff);
if (alreadyExisted != null) {
throw new EntryExistsException(task.getId());
throw new EntryExistsException("Task", task.getId());
}

log.info("Inserted task %s with status: %s", task.getId(), status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE
status.getId()
);

log.info("Inserting task %s with status: %s", task.getId(), status);
log.info("Inserting task [%s] with status [%s].", task.getId(), status);

try {
handler.insert(
Expand All @@ -153,12 +153,11 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE
task.getGroupId()
);
}
catch (DruidException e) {
throw e;
}
catch (Exception e) {
if (e instanceof EntryExistsException) {
throw (EntryExistsException) e;
} else {
throw new RuntimeException(e);
}
throw new RuntimeException(e);
}
}

Expand All @@ -167,15 +166,12 @@ public void setStatus(final TaskStatus status)
{
Preconditions.checkNotNull(status, "status");

log.info("Updating task %s to status: %s", status.getId(), status);
final String taskId = status.getId();
log.info("Updating status of task [%s] to [%s].", taskId, status);

final boolean set = handler.setStatus(
status.getId(),
status.isRunnable(),
status
);
final boolean set = handler.setStatus(taskId, status.isRunnable(), status);
if (!set) {
throw new ISE("Active task not found: %s", status.getId());
throw new ISE("No active task for id [%s]", taskId);
}
}

Expand Down Expand Up @@ -279,10 +275,8 @@ public void addLock(final String taskid, final TaskLock taskLock)
Preconditions.checkNotNull(taskLock, "taskLock");

log.info(
"Adding lock on interval[%s] version[%s] for task: %s",
taskLock.getInterval(),
taskLock.getVersion(),
taskid
"Adding lock on interval[%s] version[%s] for task [%s].",
taskLock.getInterval(), taskLock.getVersion(), taskid
);

handler.addLock(taskid, taskLock);
Expand All @@ -296,15 +290,13 @@ public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
Preconditions.checkNotNull(newLock, "newLock");

log.info(
"Replacing an existing lock[%s] with a new lock[%s] for task: %s",
oldLock,
newLock,
taskid
"Replacing an existing lock[%s] with a new lock[%s] for task [%s].",
oldLock, newLock, taskid
);

final Long oldLockId = handler.getLockId(taskid, oldLock);
if (oldLockId == null) {
throw new ISE("Cannot find an existing lock[%s]", oldLock);
throw new ISE("Cannot find lock[%s] for task [%s]", oldLock, taskid);
}

handler.replaceLock(taskid, oldLockId, newLock);
Expand Down Expand Up @@ -336,14 +328,8 @@ public List<TaskLock> getLocks(String taskid)
{
return ImmutableList.copyOf(
Iterables.transform(
getLocksWithIds(taskid).entrySet(), new Function<Map.Entry<Long, TaskLock>, TaskLock>()
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
getLocksWithIds(taskid).entrySet(),
Entry::getValue
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public boolean add(final Task task) throws EntryExistsException
IdUtils.validateId("Task ID", task.getId());

if (taskStorage.getTask(task.getId()).isPresent()) {
throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId()));
throw new EntryExistsException("Task", task.getId());
}

// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -64,7 +65,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
Expand Down Expand Up @@ -222,25 +222,15 @@ public Response taskPost(final Task task, @Context final HttpServletRequest req)

return asLeaderWith(
taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>()
{
@Override
public Response apply(TaskQueue taskQueue)
{
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
ImmutableMap.of(
"error",
StringUtils.format("Task[%s] already exists!", task.getId())
)
)
.build();
}
taskQueue -> {
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (DruidException e) {
return Response.status(e.getResponseCode())
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
}
);
Expand Down
Loading