diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java index 2c1985c053..a33e06bd05 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.cloud.dataflow.sdk.values.TypeDescriptor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AtomicDouble; import java.util.Objects; @@ -41,6 +42,14 @@ *

Counters compare using value equality of their name, kind, and * cumulative value. Equal counters should have equal toString()s. * + *

After all possible mutations have completed, the reader should check + * {@link #isDirty} for every counter, otherwise updates may be lost. + * + *

A counter may become dirty without a corresponding update to the value. + * This generally will occur when the calls to {@code addValue()}, {@code committing()}, + * and {@code committed()} are interleaved such that the value is updated + * between the calls to committing and the read of the value. + * * @param the type of values aggregated by this counter */ public abstract class Counter { @@ -257,6 +266,76 @@ public static interface CounterMean { @Nullable public abstract CounterMean getMean(); + /** + * Represents whether counters' values have been committed to the backend. + * + *

Runners can use this information to optimize counters updates. + * For example, if counters are committed, runners may choose to skip the updates. + * + *

Counters' state transition table: + * {@code + * Action\Current State COMMITTED DIRTY COMMITTING + * addValue() DIRTY DIRTY DIRTY + * committing() None COMMITTING None + * committed() None None COMMITTED + * } + */ + @VisibleForTesting + enum CommitState { + /** + * There are no local updates that haven't been committed to the backend. + */ + COMMITTED, + /** + * There are local updates that haven't been committed to the backend. + */ + DIRTY, + /** + * Local updates are committing to the backend, but are still pending. + */ + COMMITTING, + } + + /** + * Returns if the counter contains non-committed aggregate. + */ + public boolean isDirty() { + return commitState.get() != CommitState.COMMITTED; + } + + /** + * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}. + * + * @return true if successful. False return indicates that the commit state + * was not in {@code CommitState.DIRTY}. + */ + public boolean committing() { + return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); + } + + /** + * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}. + * + * @return true if successful. + * + *

False return indicates that the counter was updated while the committing is pending. + * That counter update might or might not has been committed. The {@code commitState} has to + * stay in {@code CommitState.DIRTY}. + */ + public boolean committed() { + return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); + } + + /** + * Sets the counter to {@code CommitState.DIRTY}. + * + *

Must be called at the end of {@link #addValue}, {@link #resetToValue}, + * {@link #resetMeanToValue}, and {@link #merge}. + */ + protected void setDirty() { + commitState.set(CommitState.DIRTY); + } + /** * Returns a string representation of the Counter. Useful for debugging logs. * Example return value: "ElementCount:SUM(15)". @@ -345,9 +424,13 @@ public boolean isCompatibleWith(Counter that) { /** The kind of aggregation function to apply to this counter. */ protected final AggregationKind kind; + /** The commit state of this counter. **/ + protected final AtomicReference commitState; + protected Counter(String name, AggregationKind kind) { this.name = name; this.kind = kind; + this.commitState = new AtomicReference<>(CommitState.COMMITTED); } ////////////////////////////////////////////////////////////////////////////// @@ -388,27 +471,31 @@ private LongCounter(String name, AggregationKind kind) { @Override public LongCounter addValue(Long value) { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void minAndSet(Long value, AtomicLong target) { @@ -463,26 +550,34 @@ public Long getAndResetDelta() { @Override public Counter resetToValue(Long value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Long value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + LongCounterMean counterMean = new LongCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - LongCounterMean counterMean = new LongCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -504,20 +599,25 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -583,27 +683,31 @@ private DoubleCounter(String name, AggregationKind kind) { @Override public DoubleCounter addValue(Double value) { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(Double value, AtomicReference target) { @@ -649,26 +753,34 @@ public Double getAndResetDelta() { @Override public Counter resetToValue(Double value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Double value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -699,20 +811,25 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -760,14 +877,18 @@ private BooleanCounter(String name, AggregationKind kind) { @Override public BooleanCounter addValue(Boolean value) { - if (kind.equals(AND) && !value) { - aggregate.set(value); - deltaAggregate.set(value); - } else if (kind.equals(OR) && value) { - aggregate.set(value); - deltaAggregate.set(value); + try { + if (kind.equals(AND) && !value) { + aggregate.set(value); + deltaAggregate.set(value); + } else if (kind.equals(OR) && value) { + aggregate.set(value); + deltaAggregate.set(value); + } + return this; + } finally { + setDirty(); } - return this; } @Override @@ -784,9 +905,13 @@ public Boolean getAndResetDelta() { @Override public Counter resetToValue(Boolean value) { - aggregate.set(value); - deltaAggregate.set(value); - return this; + try { + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); + } } @Override @@ -812,8 +937,13 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - return addValue(that.getAggregate()); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + return addValue(that.getAggregate()); + } finally { + setDirty(); + } } } @@ -887,7 +1017,8 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { default: throw illegalArgumentException(); @@ -931,27 +1062,31 @@ private IntegerCounter(String name, AggregationKind kind) { @Override public IntegerCounter addValue(Integer value) { - switch (kind) { - case SUM: - aggregate.getAndAdd(value); - deltaAggregate.getAndAdd(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.getAndAdd(value); + deltaAggregate.getAndAdd(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(int value, AtomicReference target) { @@ -997,26 +1132,34 @@ public Integer getAndResetDelta() { @Override public Counter resetToValue(Integer value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Integer value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -1047,20 +1190,25 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterTest.java index 619f523445..b47050b52c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.cloud.dataflow.sdk.util.common.Counter.CommitState; import com.google.cloud.dataflow.sdk.util.common.Counter.CounterMean; import org.junit.Rule; @@ -586,4 +587,148 @@ private void assertIncompatibleMerge(Counter left, Counter right) { thrown.expectMessage("are incompatible"); left.merge(right); } + + @Test + public void testDirtyBit() { + Counter longSum = Counter.longs("long-sum", SUM); + Counter longMean = Counter.longs("long-mean", MEAN); + Counter doubleSum = Counter.doubles("double-sum", SUM); + Counter doubleMean = Counter.doubles("double-sum", MEAN); + Counter intSum = Counter.ints("int-sum", SUM); + Counter intMean = Counter.ints("int-sum", MEAN); + Counter boolAnd = Counter.booleans("and", AND); + + // Test counters are not dirty and are COMMITTED initially. + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); + + // Test counters are dirty after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); + + // Test counters are dirty and are COMMITTING. + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTING, longSum.commitState.get()); + assertEquals(CommitState.COMMITTING, longMean.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTING, intSum.commitState.get()); + assertEquals(CommitState.COMMITTING, intMean.commitState.get()); + assertEquals(CommitState.COMMITTING, boolAnd.commitState.get()); + + // Test counters are dirty again after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + assertFalse(longSum.committed()); + assertFalse(longMean.committed()); + assertFalse(doubleSum.committed()); + assertFalse(doubleMean.committed()); + assertFalse(intSum.committed()); + assertFalse(intMean.committed()); + assertFalse(boolAnd.committed()); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); + + // Test counters are not dirty and are COMMITTED. + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); + + assertTrue(longSum.committed()); + assertTrue(longMean.committed()); + assertTrue(doubleSum.committed()); + assertTrue(doubleMean.committed()); + assertTrue(intSum.committed()); + assertTrue(intMean.committed()); + assertTrue(boolAnd.committed()); + + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); + } }