diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java new file mode 100644 index 000000000000..039fb8995bc7 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartmentMetrics; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "default", value = DefaultAppenderatorFactory.class) +}) +public interface AppenderatorFactory +{ + Appenderator build( + DataSchema schema, + RealtimeTuningConfig config, + FireDepartmentMetrics metrics + ); +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java new file mode 100644 index 000000000000..20b7b93275c1 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -0,0 +1,501 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.metamx.common.Granularity; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.guava.Sequence; +import com.metamx.emitter.EmittingLogger; + +import io.druid.common.guava.ThreadRenamingCallable; +import io.druid.concurrent.Execs; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.plumber.Committers; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.RejectionPolicy; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.VersioningPolicy; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.DataSegment; + +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.joda.time.Period; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class AppenderatorPlumber implements Plumber +{ + private static final EmittingLogger log = new EmittingLogger(AppenderatorPlumber.class); + private static final int WARN_DELAY = 1000; + + private final DataSchema schema; + private final RealtimeTuningConfig config; + private final RejectionPolicy rejectionPolicy; + private final FireDepartmentMetrics metrics; + private final DataSegmentAnnouncer segmentAnnouncer; + private final SegmentPublisher segmentPublisher; + private final SegmentHandoffNotifier handoffNotifier; + private final Object handoffCondition = new Object(); + private final Map segments = Maps.newConcurrentMap(); + private final Appenderator appenderator; + + private volatile boolean shuttingDown = false; + private volatile boolean stopped = false; + private volatile boolean cleanShutdown = true; + private volatile ScheduledExecutorService scheduledExecutor = null; + + private volatile Supplier lastCommitterSupplier = null; + + public AppenderatorPlumber( + DataSchema schema, + RealtimeTuningConfig config, + FireDepartmentMetrics metrics, + DataSegmentAnnouncer segmentAnnouncer, + SegmentPublisher segmentPublisher, + SegmentHandoffNotifier handoffNotifier, + Appenderator appenderator + ) + { + this.schema = schema; + this.config = config; + this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod()); + this.metrics = metrics; + this.segmentAnnouncer = segmentAnnouncer; + this.segmentPublisher = segmentPublisher; + this.handoffNotifier = handoffNotifier; + this.appenderator = appenderator; + + log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); + } + + public Map getSegmentsView() { + return ImmutableMap.copyOf(segments); + } + + public DataSchema getSchema() + { + return schema; + } + + public RealtimeTuningConfig getConfig() + { + return config; + } + + public RejectionPolicy getRejectionPolicy() + { + return rejectionPolicy; + } + + @Override + public Object startJob() + { + handoffNotifier.start(); + Object retVal = appenderator.startJob(); + initializeExecutors(); + startPersistThread(); + // Push pending sinks bootstrapped from previous run + mergeAndPush(); + return retVal; + } + + @Override + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + { + final SegmentIdentifier identifier = getSegmentIdentifier(row.getTimestampFromEpoch()); + if (identifier == null) { + return -1; + } + + final int numRows; + + try { + numRows = appenderator.add(identifier, row, committerSupplier); + lastCommitterSupplier = committerSupplier; + return numRows; + } + catch (SegmentNotWritableException e) { + // Segment already started handoff + return -1; + } + } + + @Override + public QueryRunner getQueryRunner(final Query query) + { + return new QueryRunner() + { + @Override + public Sequence run(final Query query, final Map responseContext) + { + return query.run(appenderator, responseContext); + } + }; + } + + @Override + public void persist(final Committer committer) + { + final Stopwatch runExecStopwatch = Stopwatch.createStarted(); + appenderator.persistAll(committer); + + final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS); + metrics.incrementPersistBackPressureMillis(startDelay); + if (startDelay > WARN_DELAY) { + log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); + } + runExecStopwatch.stop(); + } + + @Override + public void finishJob() + { + log.info("Shutting down..."); + + shuttingDown = true; + + List pending = appenderator.getSegments(); + if (pending.isEmpty()) { + log.info("No segments to hand off."); + } else { + log.info("Pushing segments: %s", Joiner.on(", ").join(pending)); + } + + try { + if (lastCommitterSupplier != null) { + // Push all remaining data + mergeAndPush(); + } + + synchronized (handoffCondition) { + while (!segments.isEmpty()) { + log.info("Waiting to hand off: %s", Joiner.on(", ").join(pending)); + handoffCondition.wait(); + pending = appenderator.getSegments(); + } + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + stopped = true; + handoffNotifier.close(); + shutdownExecutors(); + appenderator.close(); + } + + if (!cleanShutdown) { + throw new ISE("Exception occurred during persist and merge."); + } + } + + private SegmentIdentifier getSegmentIdentifier(long timestamp) + { + if (!rejectionPolicy.accept(timestamp)) { + return null; + } + + final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); + + final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis(); + + SegmentIdentifier retVal = segments.get(truncatedTime); + + if (retVal == null) { + final Interval interval = new Interval( + new DateTime(truncatedTime), + segmentGranularity.increment(new DateTime(truncatedTime)) + ); + + retVal = new SegmentIdentifier( + schema.getDataSource(), + interval, + versioningPolicy.getVersion(interval), + config.getShardSpec() + ); + addSegment(retVal); + + } + + return retVal; + } + + protected void initializeExecutors() + { + if (scheduledExecutor == null) { + scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d"); + } + } + + protected void shutdownExecutors() + { + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } + + private void addSegment(final SegmentIdentifier identifier) + { + segments.put(identifier.getInterval().getStartMillis(), identifier); + try { + segmentAnnouncer.announceSegment( + new DataSegment( + identifier.getDataSource(), + identifier.getInterval(), + identifier.getVersion(), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + identifier.getShardSpec(), + null, + 0 + ) + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to announce new segment[%s]", identifier.getDataSource()) + .addData("interval", identifier.getInterval()) + .emit(); + } + } + + public void dropSegment(final SegmentIdentifier identifier) + { + log.info("Dropping segment: %s", identifier); + segments.remove(identifier.getInterval().getStartMillis()); + + Futures.addCallback( + appenderator.drop(identifier), + new FutureCallback() + { + @Override + public void onSuccess(Object result) + { + log.info("Dropped segment: %s", identifier); + } + + @Override + public void onFailure(Throwable e) + { + // TODO: Retry? + log.warn(e, "Failed to drop segment: %s", identifier); + } + } + ); + } + + private void startPersistThread() + { + final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + final Period windowPeriod = config.getWindowPeriod(); + + final DateTime truncatedNow = segmentGranularity.truncate(new DateTime()); + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + + log.info( + "Expect to run at [%s]", + new DateTime().plus( + new Duration( + System.currentTimeMillis(), + segmentGranularity.increment(truncatedNow).getMillis() + windowMillis + ) + ) + ); + + ScheduledExecutors + .scheduleAtFixedRate( + scheduledExecutor, + new Duration( + System.currentTimeMillis(), + segmentGranularity.increment(truncatedNow).getMillis() + windowMillis + ), + new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)), + new ThreadRenamingCallable( + String.format( + "%s-overseer-%d", + schema.getDataSource(), + config.getShardSpec().getPartitionNum() + ) + ) + { + @Override + public ScheduledExecutors.Signal doCall() + { + if (stopped) { + log.info("Stopping merge-n-push overseer thread"); + return ScheduledExecutors.Signal.STOP; + } + + mergeAndPush(); + + if (stopped) { + log.info("Stopping merge-n-push overseer thread"); + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; + } + } + } + ); + } + + private void mergeAndPush() + { + final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + final Period windowPeriod = config.getWindowPeriod(); + + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + log.info("Starting merge and push."); + DateTime minTimestampAsDate = segmentGranularity.truncate( + new DateTime( + Math.max( + windowMillis, + rejectionPolicy.getCurrMaxTime() + .getMillis() + ) + - windowMillis + ) + ); + long minTimestamp = minTimestampAsDate.getMillis(); + + final List appenderatorSegments = appenderator.getSegments(); + final List segmentsToPush = Lists.newArrayList(); + + if (shuttingDown) { + log.info("Found [%,d] segments. Attempting to hand off all of them.", appenderatorSegments.size()); + segmentsToPush.addAll(appenderatorSegments); + } else { + log.info( + "Found [%,d] segments. Attempting to hand off segments that start before [%s].", + appenderatorSegments.size(), + minTimestampAsDate + ); + + for (SegmentIdentifier segment : appenderatorSegments) { + final Long intervalStart = segment.getInterval().getStartMillis(); + if (intervalStart < minTimestamp) { + log.info("Adding entry [%s] for merge and push.", segment); + segmentsToPush.add(segment); + } else { + log.info( + "Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", + segment, + new DateTime(intervalStart), + minTimestampAsDate + ); + } + } + } + + log.info("Found [%,d] segments to persist and merge", segmentsToPush.size()); + + final Function errorHandler = new Function() + { + @Override + public Void apply(Throwable throwable) + { + final List segmentIdentifierStrings = Lists.transform( + segmentsToPush, + new Function() + { + @Override + public String apply(SegmentIdentifier input) + { + return input.getIdentifierAsString(); + } + } + ); + + log.makeAlert(throwable, "Failed to publish merged indexes[%s]", schema.getDataSource()) + .addData("segments", segmentIdentifierStrings) + .emit(); + + if (shuttingDown) { + // We're trying to shut down, and these segments failed to push. Let's just get rid of them. + // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. + cleanShutdown = false; + for (SegmentIdentifier identifier : segmentsToPush) { + dropSegment(identifier); + } + } + + return null; + } + }; + + // WARNING: Committers.nil() here means that on-disk data can get out of sync with committing. + Futures.addCallback( + appenderator.push(segmentsToPush, Committers.nil()), + new FutureCallback() + { + @Override + public void onSuccess(SegmentsAndMetadata result) + { + // Immediately publish after pushing + for (DataSegment pushedSegment : result.getSegments()) { + try { + segmentPublisher.publishSegment(pushedSegment); + } + catch (Exception e) { + errorHandler.apply(e); + } + } + + log.info("Published [%,d] sinks.", segmentsToPush.size()); + } + + @Override + public void onFailure(Throwable e) + { + log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size()); + errorHandler.apply(e); + } + } + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java new file mode 100644 index 000000000000..66ddd9d4747c --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.coordination.DataSegmentAnnouncer; + +public class AppenderatorPlumberSchool implements PlumberSchool +{ + private final AppenderatorFactory appenderatorFactory; + private final DataSegmentAnnouncer segmentAnnouncer; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; + private final SegmentPublisher segmentPublisher; + + @JsonCreator + public AppenderatorPlumberSchool( + @JsonProperty("appenderator") AppenderatorFactory appenderatorFactory, + @JacksonInject DataSegmentAnnouncer segmentAnnouncer, + @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, + @JacksonInject SegmentPublisher segmentPublisher + ) + { + this.appenderatorFactory = appenderatorFactory; + this.segmentAnnouncer = segmentAnnouncer; + this.handoffNotifierFactory = handoffNotifierFactory; + this.segmentPublisher = segmentPublisher; + } + + @Override + public Plumber findPlumber( + final DataSchema schema, + final RealtimeTuningConfig config, + final FireDepartmentMetrics metrics + ) + { + final Appenderator appenderator = appenderatorFactory.build( + schema, + config, + metrics + ); + + return new AppenderatorPlumber( + schema, + config, + metrics, + segmentAnnouncer, + segmentPublisher, + handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), + appenderator + ); + } + + @JsonProperty("appenderator") + public AppenderatorFactory getAppenderatorFactory() + { + return appenderatorFactory; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java new file mode 100644 index 000000000000..a7b8d06bd261 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java @@ -0,0 +1,119 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.emitter.service.ServiceEmitter; + +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.guice.annotations.Processing; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.partition.ShardSpec; + +import java.io.File; +import java.util.concurrent.ExecutorService; + +public class DefaultAppenderatorFactory implements AppenderatorFactory +{ + private final ServiceEmitter emitter; + private final QueryRunnerFactoryConglomerate conglomerate; + private final DataSegmentAnnouncer segmentAnnouncer; + private final ExecutorService queryExecutorService; + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + private final Cache cache; + private final CacheConfig cacheConfig; + + public DefaultAppenderatorFactory( + @JacksonInject ServiceEmitter emitter, + @JacksonInject QueryRunnerFactoryConglomerate conglomerate, + @JacksonInject DataSegmentAnnouncer segmentAnnouncer, + @JacksonInject @Processing ExecutorService queryExecutorService, + @JacksonInject DataSegmentPusher dataSegmentPusher, + @JacksonInject ObjectMapper objectMapper, + @JacksonInject IndexIO indexIO, + @JacksonInject IndexMerger indexMerger, + @JacksonInject Cache cache, + @JacksonInject CacheConfig cacheConfig + ) + { + this.emitter = emitter; + this.conglomerate = conglomerate; + this.segmentAnnouncer = segmentAnnouncer; + this.queryExecutorService = queryExecutorService; + this.dataSegmentPusher = dataSegmentPusher; + this.objectMapper = objectMapper; + this.indexIO = indexIO; + this.indexMerger = indexMerger; + this.cache = cache; + this.cacheConfig = cacheConfig; + } + + @Override + public Appenderator build( + final DataSchema schema, + final RealtimeTuningConfig config, + final FireDepartmentMetrics metrics + ) + { + return Appenderators.createRealtime( + schema, + config.withBasePersistDirectory( + makeBasePersistSubdirectory( + config.getBasePersistDirectory(), + schema.getDataSource(), + config.getShardSpec() + ) + ), + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + conglomerate, + segmentAnnouncer, + emitter, + queryExecutorService, + cache, + cacheConfig + ); + } + + private static File makeBasePersistSubdirectory( + final File basePersistDirectory, + final String dataSource, + final ShardSpec shardSpec + ) + { + final File dataSourceDirectory = new File(basePersistDirectory, dataSource); + return new File(dataSourceDirectory, String.valueOf(shardSpec.getPartitionNum())); + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java new file mode 100644 index 000000000000..82ff6ba3fa9d --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.realtime.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.query.SegmentDescriptor; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; +import io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.DataSegment; + +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +public class AppenderatorPlumberTest +{ + private final AppenderatorPlumber plumber; + private final AppenderatorTester appenderatorTester; + + public AppenderatorPlumberTest() throws Exception { + this.appenderatorTester = new AppenderatorTester(10); + DataSegmentAnnouncer segmentAnnouncer = EasyMock + .createMock(DataSegmentAnnouncer.class); + segmentAnnouncer.announceSegment(EasyMock. anyObject()); + EasyMock.expectLastCall().anyTimes(); + + SegmentPublisher segmentPublisher = EasyMock + .createNiceMock(SegmentPublisher.class); + SegmentHandoffNotifierFactory handoffNotifierFactory = EasyMock + .createNiceMock(SegmentHandoffNotifierFactory.class); + SegmentHandoffNotifier handoffNotifier = EasyMock + .createNiceMock(SegmentHandoffNotifier.class); + EasyMock + .expect( + handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock + .anyString())).andReturn(handoffNotifier).anyTimes(); + EasyMock + .expect( + handoffNotifier.registerSegmentHandoffCallback( + EasyMock. anyObject(), + EasyMock. anyObject(), + EasyMock. anyObject())).andReturn(true).anyTimes(); + + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 1, + null, + null, + null, + new IntervalStartVersioningPolicy(), + new NoopRejectionPolicyFactory(), + null, + null, + null, + true, + 0, + 0, + false, + null + ); + + this.plumber = new AppenderatorPlumber(appenderatorTester.getSchema(), + tuningConfig, appenderatorTester.getMetrics(), + segmentAnnouncer, segmentPublisher, handoffNotifier, + appenderatorTester.getAppenderator()); + + } + + @Test + public void testSimpleIngestion() throws Exception { + + final ConcurrentMap commitMetadata = new ConcurrentHashMap<>(); + + Appenderator appenderator = appenderatorTester.getAppenderator(); + + // startJob + Assert.assertEquals(null, plumber.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, + appenderator.getDataSource()); + + InputRow[] rows = new InputRow[] {AppenderatorTest.IR("2000", "foo", 1), + AppenderatorTest.IR("2000", "bar", 2), AppenderatorTest.IR("2000", "qux", 4)}; + // add + commitMetadata.put("x", "1"); + Assert.assertEquals( + 1, + plumber.add(rows[0], null)); + + commitMetadata.put("x", "2"); + Assert.assertEquals( + 2, + plumber.add(rows[1], null)); + + commitMetadata.put("x", "3"); + Assert.assertEquals( + 3, + plumber.add(rows[2], null)); + + + Assert.assertEquals(1, plumber.getSegmentsView().size()); + + SegmentIdentifier si = plumber.getSegmentsView().values().toArray(new SegmentIdentifier[1])[0]; + + Assert.assertEquals(3, + appenderator.getRowCount(si)); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + + plumber.dropSegment(si); + plumber.finishJob(); + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index 3b910bd8cbdf..1e8c7724bb20 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -388,7 +388,7 @@ private static SegmentIdentifier SI(String interval, String version, int partiti ); } - private static InputRow IR(String ts, String dim, long met) + static InputRow IR(String ts, String dim, long met) { return new MapBasedInputRow( new DateTime(ts).getMillis(),