Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
Expand Down Expand Up @@ -282,7 +282,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception

try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();
Expand Down Expand Up @@ -871,13 +871,13 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
);
}

private FiniteAppenderatorDriver newDriver(
private AppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
return new AppenderatorDriver(
appenderator,
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
Expand Down Expand Up @@ -400,7 +400,7 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin

try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final FiniteAppenderatorDriver driver = newDriver(
final AppenderatorDriver driver = newDriver(
appenderator,
toolbox,
segmentAllocator,
Expand Down Expand Up @@ -543,14 +543,14 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
);
}

private FiniteAppenderatorDriver newDriver(
private AppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final SegmentAllocator segmentAllocator,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
return new AppenderatorDriver(
appenderator,
segmentAllocator,
new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import java.util.stream.Collectors;

/**
* A FiniteAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you
* A AppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you
* index unbounded streams. All handoff is done at the end of indexing.
*
* This class helps with doing things that Appenderators don't, including deciding which segments to use (with a
Expand All @@ -74,9 +74,9 @@
* Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as
* you pass in. It's wrapped in some extra metadata needed by the driver.
*/
public class FiniteAppenderatorDriver implements Closeable
public class AppenderatorDriver implements Closeable
{
private static final Logger log = new Logger(FiniteAppenderatorDriver.class);
private static final Logger log = new Logger(AppenderatorDriver.class);

private final Appenderator appenderator;
private final SegmentAllocator segmentAllocator;
Expand Down Expand Up @@ -111,7 +111,7 @@ public class FiniteAppenderatorDriver implements Closeable
* @param objectMapper object mapper, used for serde of commit metadata
* @param metrics Firedepartment metrics
*/
public FiniteAppenderatorDriver(
public AppenderatorDriver(
Appenderator appenderator,
SegmentAllocator segmentAllocator,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import javax.annotation.Nullable;

/**
* Result of {@link FiniteAppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the
* Result of {@link AppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the
* segment which the InputRow is added to and the number of rows in that segment.
*/
public class AppenderatorDriverAddResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import io.druid.query.SegmentDescriptor;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestCommitterSupplier;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestCommitterSupplier;
import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
Expand All @@ -65,7 +65,7 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class FiniteAppenderatorDriverFailTest
public class AppenderatorDriverFailTest
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
Expand All @@ -91,7 +91,7 @@ public class FiniteAppenderatorDriverFailTest

SegmentAllocator allocator;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
FiniteAppenderatorDriver driver;
AppenderatorDriver driver;

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand All @@ -117,7 +117,7 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti
{
expectedException.expect(TimeoutException.class);

driver = new FiniteAppenderatorDriver(
driver = new AppenderatorDriver(
createPersistFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
Expand All @@ -139,7 +139,7 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti
}

driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(),
AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Expand All @@ -151,7 +151,7 @@ public void testInterruptDuringPush() throws IOException, InterruptedException,
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));

driver = new FiniteAppenderatorDriver(
driver = new AppenderatorDriver(
createPushInterruptAppenderator(),
allocator,
segmentHandoffNotifierFactory,
Expand All @@ -173,7 +173,7 @@ public void testInterruptDuringPush() throws IOException, InterruptedException,
}

driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(),
AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Expand All @@ -184,7 +184,7 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo
{
expectedException.expect(TimeoutException.class);

driver = new FiniteAppenderatorDriver(
driver = new AppenderatorDriver(
createPushFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
Expand All @@ -206,7 +206,7 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo
}

driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(),
AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Expand All @@ -221,7 +221,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo
"Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
);

driver = new FiniteAppenderatorDriver(
driver = new AppenderatorDriver(
createDropFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
Expand All @@ -243,7 +243,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo
}

final SegmentsAndMetadata published = driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(),
AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class FiniteAppenderatorDriverTest
public class AppenderatorDriverTest
{
private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123";
Expand Down Expand Up @@ -95,15 +95,15 @@ public class FiniteAppenderatorDriverTest
SegmentAllocator allocator;
AppenderatorTester appenderatorTester;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
FiniteAppenderatorDriver driver;
AppenderatorDriver driver;

@Before
public void setUp()
{
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
driver = new FiniteAppenderatorDriver(
driver = new AppenderatorDriver(
appenderatorTester.getAppenderator(),
allocator,
segmentHandoffNotifierFactory,
Expand Down