Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ public boolean requiresDeduping() {
return dedup;
}

private class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
/**
* Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to
* {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast.
*/
public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
private int current;

public CountingSourceReader(int startingPoint) {
Expand All @@ -164,21 +168,20 @@ public CountingSourceReader(int startingPoint) {

@Override
public boolean start() {
return true;
return advance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contractually you can only be called to start once, use another variable to track whether you have been started and throw an error if you have been called multiple times.

Also, check that in advance, that start has been called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guy is on the way out - it should be replaced by CountingSource, so I'm not going to go to town here.

}

@Override
public boolean advance() {
if (current < numMessagesPerShard - 1) {
// If testing dedup, occasionally insert a duplicate value;
if (dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
return true;
}
current++;
return true;
} else {
if (current >= numMessagesPerShard) {
return false;
}
// If testing dedup, occasionally insert a duplicate value;
if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this produce duplicates before the checkpoint mark?

So we will output 0, then 1, then checkpoint that we just output 1, and then we will potentially output 1 again after restoring from checkpoint?

It seems as though restoring from checkpoint should guarantee that the previously emit values won't be emit again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the implementation is broken, but already was broken.

return true;
}
current++;
return current < numMessagesPerShard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indentation looks off here, but maybe it's a tabs-vs-spaces thing? Except we shouldn't have tabs.. hmm.. can you please investigate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that all the get/close calls ensure that start has been called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Expand Down Expand Up @@ -216,12 +219,14 @@ public Instant getWatermark() {
}

@Override
public CheckpointMark getCheckpointMark() {
public CounterMark getCheckpointMark() {
if (throwOnFirstSnapshot && !thrown) {
thrown = true;
LOG.error("Throwing exception while checkpointing counter");
throw new RuntimeException("failed during checkpoint");
}
// The checkpoint can assume all records read, including the current, have
// been commited.
return new CounterMark(current);
}

Expand All @@ -234,7 +239,12 @@ public long getSplitBacklogBytes() {
@Override
public CountingSourceReader createReader(
PipelineOptions options, @Nullable CounterMark checkpointMark) {
return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : 0);
if (checkpointMark == null) {
LOG.debug("creating reader");
} else {
LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
}
return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.runners.dataflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.IOException;

/**
* Test the TestCountingSource.
*/
@RunWith(JUnit4.class)
public class TestCountingSourceTest {
@Test
public void testRespectsCheckpointContract() throws IOException {
TestCountingSource source = new TestCountingSource(3);
PipelineOptions options = PipelineOptionsFactory.create();
TestCountingSource.CountingSourceReader reader =
source.createReader(options, null /* no checkpoint */);
assertTrue(reader.start());
assertEquals(0L, (long) reader.getCurrent().getValue());
assertTrue(reader.advance());
assertEquals(1L, (long) reader.getCurrent().getValue());
TestCountingSource.CounterMark checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader = source.createReader(options, checkpoint);
assertTrue(reader.start());
assertEquals(2L, (long) reader.getCurrent().getValue());
assertFalse(reader.advance());
}
}