From 99c76c1742c33d4cf9ed1bd63504e5c567c65d0c Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 19 Apr 2016 18:21:40 -0700 Subject: [PATCH 1/6] Add DirtyBit to represent whether Counters have been committed. --- .../apache/beam/sdk/util/common/Counter.java | 70 +++++++++ .../beam/sdk/util/common/CounterTest.java | 146 +++++++++++++++++- 2 files changed, 215 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 6024576cabce..f8d133442890 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AtomicDouble; import java.util.Objects; @@ -294,6 +295,56 @@ public static interface CounterMean { @Nullable public abstract CounterMean getMean(); + /** + * DirtyBit represents whether counters' values have been committed to the backend. + * + *

Runners can use this information to optimize counters updates. + * For example, if counters are not dirty, runners may choose to skip the updates. + * + *

Counters' state transition table: + * Action\Current State COMMITTED DIRTY COMMITTING + * add() DIRTY DIRTY DIRTY + * committing() None COMMITTING None + * committed() None None COMMITTED + */ + @VisibleForTesting + enum DirtyBit { + COMMITTED, + DIRTY, + COMMITTING, + } + + /** + * Returns if the counter contains non-committed aggregate. + */ + public boolean isDirty() { + return dirty.get() != DirtyBit.COMMITTED; + } + + /** + * Changes the counter from {@code DirtyBit.DIRTY} to {@code DirtyBit.COMMITTING}. + */ + public boolean committing() { + return dirty.compareAndSet(DirtyBit.DIRTY, DirtyBit.COMMITTING); + } + + /** + * Changes the counter from {@code DirtyBit.COMMITTING} to {@code DirtyBit.COMMITTED}. + */ + public boolean committed() { + return dirty.compareAndSet(DirtyBit.COMMITTING, DirtyBit.COMMITTED); + } + + /** + * Sets the counter to {@code DirtyBit.DIRTY}. + * + *

It needs to be called at the beginning of {@link #addValue}, {@link #resetToValue}, + * {@link #resetMeanToValue}, and {@link #merge}. + */ + protected void setDirty() { + dirty.set(DirtyBit.DIRTY); + } + /** * Returns a string representation of the Counter. Useful for debugging logs. * Example return value: "ElementCount:SUM(15)". @@ -382,9 +433,13 @@ public boolean isCompatibleWith(Counter that) { /** The kind of aggregation function to apply to this counter. */ protected final AggregationKind kind; + /** The dirty bit of this counter. **/ + protected final AtomicReference dirty; + protected Counter(CounterName name, AggregationKind kind) { this.name = name; this.kind = kind; + this.dirty = new AtomicReference<>(DirtyBit.COMMITTED); } ////////////////////////////////////////////////////////////////////////////// @@ -425,6 +480,7 @@ private LongCounter(CounterName name, AggregationKind kind) { @Override public LongCounter addValue(Long value) { + setDirty(); switch (kind) { case SUM: aggregate.addAndGet(value); @@ -500,6 +556,7 @@ public Long getAndResetDelta() { @Override public Counter resetToValue(Long value) { + setDirty(); if (kind == MEAN) { throw illegalArgumentException(); } @@ -510,6 +567,7 @@ public Counter resetToValue(Long value) { @Override public Counter resetMeanToValue(long elementCount, Long value) { + setDirty(); if (kind != MEAN) { throw illegalArgumentException(); } @@ -541,6 +599,7 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { + setDirty(); checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: @@ -620,6 +679,7 @@ private DoubleCounter(CounterName name, AggregationKind kind) { @Override public DoubleCounter addValue(Double value) { + setDirty(); switch (kind) { case SUM: aggregate.addAndGet(value); @@ -686,6 +746,7 @@ public Double getAndResetDelta() { @Override public Counter resetToValue(Double value) { + setDirty(); if (kind == MEAN) { throw illegalArgumentException(); } @@ -696,6 +757,7 @@ public Counter resetToValue(Double value) { @Override public Counter resetMeanToValue(long elementCount, Double value) { + setDirty(); if (kind != MEAN) { throw illegalArgumentException(); } @@ -736,6 +798,7 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { + setDirty(); checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: @@ -797,6 +860,7 @@ private BooleanCounter(CounterName name, AggregationKind kind) { @Override public BooleanCounter addValue(Boolean value) { + setDirty(); if (kind.equals(AND) && !value) { aggregate.set(value); deltaAggregate.set(value); @@ -821,6 +885,7 @@ public Boolean getAndResetDelta() { @Override public Counter resetToValue(Boolean value) { + setDirty(); aggregate.set(value); deltaAggregate.set(value); return this; @@ -849,6 +914,7 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { + setDirty(); checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); return addValue(that.getAggregate()); } @@ -968,6 +1034,7 @@ private IntegerCounter(CounterName name, AggregationKind kind) { @Override public IntegerCounter addValue(Integer value) { + setDirty(); switch (kind) { case SUM: aggregate.getAndAdd(value); @@ -1034,6 +1101,7 @@ public Integer getAndResetDelta() { @Override public Counter resetToValue(Integer value) { + setDirty(); if (kind == MEAN) { throw illegalArgumentException(); } @@ -1044,6 +1112,7 @@ public Counter resetToValue(Integer value) { @Override public Counter resetMeanToValue(long elementCount, Integer value) { + setDirty(); if (kind != MEAN) { throw illegalArgumentException(); } @@ -1084,6 +1153,7 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { + setDirty(); checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java index 5f75bb816fec..3bf78ca0d97e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java @@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.util.common.Counter.CounterMean; - +import org.apache.beam.sdk.util.common.Counter.DirtyBit; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -588,4 +588,148 @@ private void assertIncompatibleMerge(Counter left, Counter right) { thrown.expectMessage("are incompatible"); left.merge(right); } + + @Test + public void testDirtyBit() { + Counter longSum = Counter.longs("long-sum", SUM); + Counter longMean = Counter.longs("long-mean", MEAN); + Counter doubleSum = Counter.doubles("double-sum", SUM); + Counter doubleMean = Counter.doubles("double-sum", MEAN); + Counter intSum = Counter.ints("int-sum", SUM); + Counter intMean = Counter.ints("int-sum", MEAN); + Counter boolAnd = Counter.booleans("and", AND); + + // Test counters are not dirty and are COMMITTED initially. + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(DirtyBit.COMMITTED, longSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, longMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, doubleSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, doubleMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, intSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, intMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, boolAnd.dirty.get()); + + // Test counters are dirty after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(DirtyBit.DIRTY, longSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, longMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, doubleSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, doubleMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, intSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, intMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, boolAnd.dirty.get()); + + // Test counters are dirty and are COMMITTING. + longSum.committing(); + longMean.committing(); + doubleSum.committing(); + doubleMean.committing(); + intSum.committing(); + intMean.committing(); + boolAnd.committing(); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(DirtyBit.COMMITTING, longSum.dirty.get()); + assertEquals(DirtyBit.COMMITTING, longMean.dirty.get()); + assertEquals(DirtyBit.COMMITTING, doubleSum.dirty.get()); + assertEquals(DirtyBit.COMMITTING, doubleMean.dirty.get()); + assertEquals(DirtyBit.COMMITTING, intSum.dirty.get()); + assertEquals(DirtyBit.COMMITTING, intMean.dirty.get()); + assertEquals(DirtyBit.COMMITTING, boolAnd.dirty.get()); + + // Test counters are dirty again after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + longSum.committed(); + longMean.committed(); + doubleSum.committed(); + doubleMean.committed(); + intSum.committed(); + intMean.committed(); + boolAnd.committed(); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(DirtyBit.DIRTY, longSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, longMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, doubleSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, doubleMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, intSum.dirty.get()); + assertEquals(DirtyBit.DIRTY, intMean.dirty.get()); + assertEquals(DirtyBit.DIRTY, boolAnd.dirty.get()); + + // Test counters are not dirty and are COMMITTED. + longSum.committing(); + longMean.committing(); + doubleSum.committing(); + doubleMean.committing(); + intSum.committing(); + intMean.committing(); + boolAnd.committing(); + + longSum.committed(); + longMean.committed(); + doubleSum.committed(); + doubleMean.committed(); + intSum.committed(); + intMean.committed(); + boolAnd.committed(); + + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(DirtyBit.COMMITTED, longSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, longMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, doubleSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, doubleMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, intSum.dirty.get()); + assertEquals(DirtyBit.COMMITTED, intMean.dirty.get()); + assertEquals(DirtyBit.COMMITTED, boolAnd.dirty.get()); + } } From 09f4097ff3bd128cc385c0d1875d63b065e2d300 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 20 Apr 2016 16:07:50 -0700 Subject: [PATCH 2/6] fixup to address comments --- .../apache/beam/sdk/util/common/Counter.java | 413 ++++++++++-------- .../beam/sdk/util/common/CounterTest.java | 130 +++--- 2 files changed, 294 insertions(+), 249 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index f8d133442890..4a9506c77ed9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -296,10 +296,10 @@ public static interface CounterMean { public abstract CounterMean getMean(); /** - * DirtyBit represents whether counters' values have been committed to the backend. + * CommitState represents whether counters' values have been committed to the backend. * *

Runners can use this information to optimize counters updates. - * For example, if counters are not dirty, runners may choose to skip the updates. + * For example, if counters are committed, runners may choose to skip the updates. * *

Counters' state transition table: * Action\Current State COMMITTED DIRTY COMMITTING @@ -308,7 +308,7 @@ public static interface CounterMean { * committed() None None COMMITTED */ @VisibleForTesting - enum DirtyBit { + enum CommitState { COMMITTED, DIRTY, COMMITTING, @@ -318,31 +318,31 @@ enum DirtyBit { * Returns if the counter contains non-committed aggregate. */ public boolean isDirty() { - return dirty.get() != DirtyBit.COMMITTED; + return commitState.get() != CommitState.COMMITTED; } /** - * Changes the counter from {@code DirtyBit.DIRTY} to {@code DirtyBit.COMMITTING}. + * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}. */ public boolean committing() { - return dirty.compareAndSet(DirtyBit.DIRTY, DirtyBit.COMMITTING); + return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); } /** - * Changes the counter from {@code DirtyBit.COMMITTING} to {@code DirtyBit.COMMITTED}. + * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}. */ public boolean committed() { - return dirty.compareAndSet(DirtyBit.COMMITTING, DirtyBit.COMMITTED); + return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); } /** - * Sets the counter to {@code DirtyBit.DIRTY}. + * Sets the counter to {@code CommitState.DIRTY}. * *

It needs to be called at the beginning of {@link #addValue}, {@link #resetToValue}, * {@link #resetMeanToValue}, and {@link #merge}. */ protected void setDirty() { - dirty.set(DirtyBit.DIRTY); + commitState.set(CommitState.DIRTY); } /** @@ -433,13 +433,13 @@ public boolean isCompatibleWith(Counter that) { /** The kind of aggregation function to apply to this counter. */ protected final AggregationKind kind; - /** The dirty bit of this counter. **/ - protected final AtomicReference dirty; + /** The commit state of this counter. **/ + protected final AtomicReference commitState; protected Counter(CounterName name, AggregationKind kind) { this.name = name; this.kind = kind; - this.dirty = new AtomicReference<>(DirtyBit.COMMITTED); + this.commitState = new AtomicReference<>(CommitState.COMMITTED); } ////////////////////////////////////////////////////////////////////////////// @@ -480,28 +480,31 @@ private LongCounter(CounterName name, AggregationKind kind) { @Override public LongCounter addValue(Long value) { - setDirty(); - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void minAndSet(Long value, AtomicLong target) { @@ -556,28 +559,34 @@ public Long getAndResetDelta() { @Override public Counter resetToValue(Long value) { - setDirty(); - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Long value) { - setDirty(); - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + LongCounterMean counterMean = new LongCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - LongCounterMean counterMean = new LongCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -599,21 +608,24 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - setDirty(); - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -679,28 +691,31 @@ private DoubleCounter(CounterName name, AggregationKind kind) { @Override public DoubleCounter addValue(Double value) { - setDirty(); - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(Double value, AtomicReference target) { @@ -746,28 +761,34 @@ public Double getAndResetDelta() { @Override public Counter resetToValue(Double value) { - setDirty(); - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Double value) { - setDirty(); - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -798,21 +819,24 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - setDirty(); - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -860,15 +884,18 @@ private BooleanCounter(CounterName name, AggregationKind kind) { @Override public BooleanCounter addValue(Boolean value) { - setDirty(); - if (kind.equals(AND) && !value) { - aggregate.set(value); - deltaAggregate.set(value); - } else if (kind.equals(OR) && value) { - aggregate.set(value); - deltaAggregate.set(value); + try { + if (kind.equals(AND) && !value) { + aggregate.set(value); + deltaAggregate.set(value); + } else if (kind.equals(OR) && value) { + aggregate.set(value); + deltaAggregate.set(value); + } + return this; + } finally { + setDirty(); } - return this; } @Override @@ -885,10 +912,13 @@ public Boolean getAndResetDelta() { @Override public Counter resetToValue(Boolean value) { - setDirty(); - aggregate.set(value); - deltaAggregate.set(value); - return this; + try { + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); + } } @Override @@ -914,9 +944,12 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - setDirty(); - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - return addValue(that.getAggregate()); + try { + checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + return addValue(that.getAggregate()); + } finally { + setDirty(); + } } } @@ -1034,28 +1067,31 @@ private IntegerCounter(CounterName name, AggregationKind kind) { @Override public IntegerCounter addValue(Integer value) { - setDirty(); - switch (kind) { - case SUM: - aggregate.getAndAdd(value); - deltaAggregate.getAndAdd(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.getAndAdd(value); + deltaAggregate.getAndAdd(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(int value, AtomicReference target) { @@ -1101,28 +1137,34 @@ public Integer getAndResetDelta() { @Override public Counter resetToValue(Integer value) { - setDirty(); - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter resetMeanToValue(long elementCount, Integer value) { - setDirty(); - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -1153,21 +1195,24 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { - setDirty(); - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean thisCounterMean = this.getMean(); - CounterMean thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean thisCounterMean = this.getMean(); + CounterMean thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java index 3bf78ca0d97e..fb002def3d76 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java @@ -29,8 +29,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.beam.sdk.util.common.Counter.CommitState; import org.apache.beam.sdk.util.common.Counter.CounterMean; -import org.apache.beam.sdk.util.common.Counter.DirtyBit; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -608,13 +608,13 @@ public void testDirtyBit() { assertFalse(intMean.isDirty()); assertFalse(boolAnd.isDirty()); - assertEquals(DirtyBit.COMMITTED, longSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, longMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, doubleSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, doubleMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, intSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, intMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, boolAnd.dirty.get()); + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); // Test counters are dirty after mutating. longSum.addValue(1L); @@ -633,22 +633,22 @@ public void testDirtyBit() { assertTrue(intMean.isDirty()); assertTrue(boolAnd.isDirty()); - assertEquals(DirtyBit.DIRTY, longSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, longMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, doubleSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, doubleMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, intSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, intMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, boolAnd.dirty.get()); + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); // Test counters are dirty and are COMMITTING. - longSum.committing(); - longMean.committing(); - doubleSum.committing(); - doubleMean.committing(); - intSum.committing(); - intMean.committing(); - boolAnd.committing(); + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); assertTrue(longSum.isDirty()); assertTrue(longMean.isDirty()); @@ -658,13 +658,13 @@ public void testDirtyBit() { assertTrue(intMean.isDirty()); assertTrue(boolAnd.isDirty()); - assertEquals(DirtyBit.COMMITTING, longSum.dirty.get()); - assertEquals(DirtyBit.COMMITTING, longMean.dirty.get()); - assertEquals(DirtyBit.COMMITTING, doubleSum.dirty.get()); - assertEquals(DirtyBit.COMMITTING, doubleMean.dirty.get()); - assertEquals(DirtyBit.COMMITTING, intSum.dirty.get()); - assertEquals(DirtyBit.COMMITTING, intMean.dirty.get()); - assertEquals(DirtyBit.COMMITTING, boolAnd.dirty.get()); + assertEquals(CommitState.COMMITTING, longSum.commitState.get()); + assertEquals(CommitState.COMMITTING, longMean.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTING, intSum.commitState.get()); + assertEquals(CommitState.COMMITTING, intMean.commitState.get()); + assertEquals(CommitState.COMMITTING, boolAnd.commitState.get()); // Test counters are dirty again after mutating. longSum.addValue(1L); @@ -675,13 +675,13 @@ public void testDirtyBit() { intMean.resetMeanToValue(1, 1); boolAnd.addValue(true); - longSum.committed(); - longMean.committed(); - doubleSum.committed(); - doubleMean.committed(); - intSum.committed(); - intMean.committed(); - boolAnd.committed(); + assertFalse(longSum.committed()); + assertFalse(longMean.committed()); + assertFalse(doubleSum.committed()); + assertFalse(doubleMean.committed()); + assertFalse(intSum.committed()); + assertFalse(intMean.committed()); + assertFalse(boolAnd.committed()); assertTrue(longSum.isDirty()); assertTrue(longMean.isDirty()); @@ -691,30 +691,30 @@ public void testDirtyBit() { assertTrue(intMean.isDirty()); assertTrue(boolAnd.isDirty()); - assertEquals(DirtyBit.DIRTY, longSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, longMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, doubleSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, doubleMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, intSum.dirty.get()); - assertEquals(DirtyBit.DIRTY, intMean.dirty.get()); - assertEquals(DirtyBit.DIRTY, boolAnd.dirty.get()); + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); // Test counters are not dirty and are COMMITTED. - longSum.committing(); - longMean.committing(); - doubleSum.committing(); - doubleMean.committing(); - intSum.committing(); - intMean.committing(); - boolAnd.committing(); - - longSum.committed(); - longMean.committed(); - doubleSum.committed(); - doubleMean.committed(); - intSum.committed(); - intMean.committed(); - boolAnd.committed(); + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); + + assertTrue(longSum.committed()); + assertTrue(longMean.committed()); + assertTrue(doubleSum.committed()); + assertTrue(doubleMean.committed()); + assertTrue(intSum.committed()); + assertTrue(intMean.committed()); + assertTrue(boolAnd.committed()); assertFalse(longSum.isDirty()); assertFalse(longMean.isDirty()); @@ -724,12 +724,12 @@ public void testDirtyBit() { assertFalse(intMean.isDirty()); assertFalse(boolAnd.isDirty()); - assertEquals(DirtyBit.COMMITTED, longSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, longMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, doubleSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, doubleMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, intSum.dirty.get()); - assertEquals(DirtyBit.COMMITTED, intMean.dirty.get()); - assertEquals(DirtyBit.COMMITTED, boolAnd.dirty.get()); + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); } } From b7bd50c4b3ce6a4b34dd640ce93aeb054857d5d4 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 20 Apr 2016 16:35:59 -0700 Subject: [PATCH 3/6] fixup warnings --- .../org/apache/beam/sdk/util/common/Counter.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 4a9506c77ed9..c75ec8b86d43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -609,7 +609,8 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { try { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: case MIN: @@ -820,7 +821,8 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { try { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: case MIN: @@ -945,7 +947,8 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { try { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); return addValue(that.getAggregate()); } finally { setDirty(); @@ -1196,7 +1199,8 @@ public CounterMean getMean() { @Override public Counter merge(Counter that) { try { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); switch (kind) { case SUM: case MIN: From 89736464e6431be7806e593c0effea334e735f31 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 22 Apr 2016 14:48:51 -0700 Subject: [PATCH 4/6] fixup: address comments --- .../apache/beam/sdk/util/common/Counter.java | 62 ++++++++++++------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index c75ec8b86d43..56b3ed7f7786 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -296,49 +296,69 @@ public static interface CounterMean { public abstract CounterMean getMean(); /** - * CommitState represents whether counters' values have been committed to the backend. + * Represents whether counters' values have been committed to the backend. * *

Runners can use this information to optimize counters updates. * For example, if counters are committed, runners may choose to skip the updates. * *

Counters' state transition table: + * {@code * Action\Current State COMMITTED DIRTY COMMITTING - * add() DIRTY DIRTY DIRTY + * addValue() DIRTY DIRTY DIRTY * committing() None COMMITTING None * committed() None None COMMITTED + * } */ @VisibleForTesting enum CommitState { + /** + * There are no local updates that haven't been committed to the backend. + */ COMMITTED, + /** + * There are local updates that haven't been committed to the backend. + */ DIRTY, + /** + * Local updates are committing to the backend, but are still pending. + */ COMMITTING, } /** * Returns if the counter contains non-committed aggregate. */ - public boolean isDirty() { + public synchronized boolean isDirty() { return commitState.get() != CommitState.COMMITTED; } /** * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}. + * + * @return true if successful. False return indicates that the commit state + * was not in {@code CommitState.DIRTY}. */ - public boolean committing() { + public synchronized boolean committing() { return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); } /** * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}. + * + * @return true if successful. + * + *

False return indicates that the counter was updated while the committing is pending. + * That counter update might or might not has been committed. The {@code commitState} has to + * stay in {@code CommitState.DIRTY}. */ - public boolean committed() { + public synchronized boolean committed() { return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); } /** * Sets the counter to {@code CommitState.DIRTY}. * - *

It needs to be called at the beginning of {@link #addValue}, {@link #resetToValue}, + *

Must be called at the end of {@link #addValue}, {@link #resetToValue}, * {@link #resetMeanToValue}, and {@link #merge}. */ protected void setDirty() { @@ -479,7 +499,7 @@ private LongCounter(CounterName name, AggregationKind kind) { } @Override - public LongCounter addValue(Long value) { + public synchronized LongCounter addValue(Long value) { try { switch (kind) { case SUM: @@ -558,7 +578,7 @@ public Long getAndResetDelta() { } @Override - public Counter resetToValue(Long value) { + public synchronized Counter resetToValue(Long value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -572,7 +592,7 @@ public Counter resetToValue(Long value) { } @Override - public Counter resetMeanToValue(long elementCount, Long value) { + public synchronized Counter resetMeanToValue(long elementCount, Long value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -607,7 +627,7 @@ public CounterMean getMean() { } @Override - public Counter merge(Counter that) { + public synchronized Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -691,7 +711,7 @@ private DoubleCounter(CounterName name, AggregationKind kind) { } @Override - public DoubleCounter addValue(Double value) { + public synchronized DoubleCounter addValue(Double value) { try { switch (kind) { case SUM: @@ -761,7 +781,7 @@ public Double getAndResetDelta() { } @Override - public Counter resetToValue(Double value) { + public synchronized Counter resetToValue(Double value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -775,7 +795,7 @@ public Counter resetToValue(Double value) { } @Override - public Counter resetMeanToValue(long elementCount, Double value) { + public synchronized Counter resetMeanToValue(long elementCount, Double value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -819,7 +839,7 @@ public CounterMean getMean() { } @Override - public Counter merge(Counter that) { + public synchronized Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -885,7 +905,7 @@ private BooleanCounter(CounterName name, AggregationKind kind) { } @Override - public BooleanCounter addValue(Boolean value) { + public synchronized BooleanCounter addValue(Boolean value) { try { if (kind.equals(AND) && !value) { aggregate.set(value); @@ -913,7 +933,7 @@ public Boolean getAndResetDelta() { } @Override - public Counter resetToValue(Boolean value) { + public synchronized Counter resetToValue(Boolean value) { try { aggregate.set(value); deltaAggregate.set(value); @@ -945,7 +965,7 @@ public CounterMean getMean() { } @Override - public Counter merge(Counter that) { + public synchronized Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -1069,7 +1089,7 @@ private IntegerCounter(CounterName name, AggregationKind kind) { } @Override - public IntegerCounter addValue(Integer value) { + public synchronized IntegerCounter addValue(Integer value) { try { switch (kind) { case SUM: @@ -1139,7 +1159,7 @@ public Integer getAndResetDelta() { } @Override - public Counter resetToValue(Integer value) { + public synchronized Counter resetToValue(Integer value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -1153,7 +1173,7 @@ public Counter resetToValue(Integer value) { } @Override - public Counter resetMeanToValue(long elementCount, Integer value) { + public synchronized Counter resetMeanToValue(long elementCount, Integer value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -1197,7 +1217,7 @@ public CounterMean getMean() { } @Override - public Counter merge(Counter that) { + public synchronized Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); From 9f05935d553d6297c13e78227ebdce99030f21b2 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 25 Apr 2016 14:14:48 -0700 Subject: [PATCH 5/6] fixed up: address feedback --- .../apache/beam/sdk/util/common/Counter.java | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 56b3ed7f7786..02551517fcd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -327,8 +327,16 @@ enum CommitState { /** * Returns if the counter contains non-committed aggregate. + * + *

After all possible mutations have completed, the reader should check + * {@link #isDirty} for every counter, otherwise updates may be lost. + * + *

A counter may become dirty without a corresponding update to the value. + * This generally will occur when the calls to {@code addValue()}, {@code committing()}, + * and {@code committed()} are interleaved such that the value is updated + * between the calls to committing and the read of the value. */ - public synchronized boolean isDirty() { + public boolean isDirty() { return commitState.get() != CommitState.COMMITTED; } @@ -338,7 +346,7 @@ public synchronized boolean isDirty() { * @return true if successful. False return indicates that the commit state * was not in {@code CommitState.DIRTY}. */ - public synchronized boolean committing() { + public boolean committing() { return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); } @@ -351,7 +359,7 @@ public synchronized boolean committing() { * That counter update might or might not has been committed. The {@code commitState} has to * stay in {@code CommitState.DIRTY}. */ - public synchronized boolean committed() { + public boolean committed() { return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); } @@ -499,7 +507,7 @@ private LongCounter(CounterName name, AggregationKind kind) { } @Override - public synchronized LongCounter addValue(Long value) { + public LongCounter addValue(Long value) { try { switch (kind) { case SUM: @@ -578,7 +586,7 @@ public Long getAndResetDelta() { } @Override - public synchronized Counter resetToValue(Long value) { + public Counter resetToValue(Long value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -592,7 +600,7 @@ public synchronized Counter resetToValue(Long value) { } @Override - public synchronized Counter resetMeanToValue(long elementCount, Long value) { + public Counter resetMeanToValue(long elementCount, Long value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -627,7 +635,7 @@ public CounterMean getMean() { } @Override - public synchronized Counter merge(Counter that) { + public Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -711,7 +719,7 @@ private DoubleCounter(CounterName name, AggregationKind kind) { } @Override - public synchronized DoubleCounter addValue(Double value) { + public DoubleCounter addValue(Double value) { try { switch (kind) { case SUM: @@ -781,7 +789,7 @@ public Double getAndResetDelta() { } @Override - public synchronized Counter resetToValue(Double value) { + public Counter resetToValue(Double value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -795,7 +803,7 @@ public synchronized Counter resetToValue(Double value) { } @Override - public synchronized Counter resetMeanToValue(long elementCount, Double value) { + public Counter resetMeanToValue(long elementCount, Double value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -839,7 +847,7 @@ public CounterMean getMean() { } @Override - public synchronized Counter merge(Counter that) { + public Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -905,7 +913,7 @@ private BooleanCounter(CounterName name, AggregationKind kind) { } @Override - public synchronized BooleanCounter addValue(Boolean value) { + public BooleanCounter addValue(Boolean value) { try { if (kind.equals(AND) && !value) { aggregate.set(value); @@ -933,7 +941,7 @@ public Boolean getAndResetDelta() { } @Override - public synchronized Counter resetToValue(Boolean value) { + public Counter resetToValue(Boolean value) { try { aggregate.set(value); deltaAggregate.set(value); @@ -965,7 +973,7 @@ public CounterMean getMean() { } @Override - public synchronized Counter merge(Counter that) { + public Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); @@ -1089,7 +1097,7 @@ private IntegerCounter(CounterName name, AggregationKind kind) { } @Override - public synchronized IntegerCounter addValue(Integer value) { + public IntegerCounter addValue(Integer value) { try { switch (kind) { case SUM: @@ -1159,7 +1167,7 @@ public Integer getAndResetDelta() { } @Override - public synchronized Counter resetToValue(Integer value) { + public Counter resetToValue(Integer value) { try { if (kind == MEAN) { throw illegalArgumentException(); @@ -1173,7 +1181,7 @@ public synchronized Counter resetToValue(Integer value) { } @Override - public synchronized Counter resetMeanToValue(long elementCount, Integer value) { + public Counter resetMeanToValue(long elementCount, Integer value) { try { if (kind != MEAN) { throw illegalArgumentException(); @@ -1217,7 +1225,7 @@ public CounterMean getMean() { } @Override - public synchronized Counter merge(Counter that) { + public Counter merge(Counter that) { try { checkArgument( this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); From 2f9f4dcff04498e961ac4dfe4b735e186cbedb1f Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 25 Apr 2016 15:38:29 -0700 Subject: [PATCH 6/6] fixup: address feedback --- .../org/apache/beam/sdk/util/common/Counter.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 02551517fcd0..9f9b0c1fbb8e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -45,6 +45,14 @@ *

Counters compare using value equality of their name, kind, and * cumulative value. Equal counters should have equal toString()s. * + *

After all possible mutations have completed, the reader should check + * {@link #isDirty} for every counter, otherwise updates may be lost. + * + *

A counter may become dirty without a corresponding update to the value. + * This generally will occur when the calls to {@code addValue()}, {@code committing()}, + * and {@code committed()} are interleaved such that the value is updated + * between the calls to committing and the read of the value. + * * @param the type of values aggregated by this counter */ public abstract class Counter { @@ -327,14 +335,6 @@ enum CommitState { /** * Returns if the counter contains non-committed aggregate. - * - *

After all possible mutations have completed, the reader should check - * {@link #isDirty} for every counter, otherwise updates may be lost. - * - *

A counter may become dirty without a corresponding update to the value. - * This generally will occur when the calls to {@code addValue()}, {@code committing()}, - * and {@code committed()} are interleaved such that the value is updated - * between the calls to committing and the read of the value. */ public boolean isDirty() { return commitState.get() != CommitState.COMMITTED;