() {
+ @Override
+ public void encode(Event value, OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ if (value.newPerson != null) {
+ INT_CODER.encode(0, outStream, Context.NESTED);
+ Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+ } else if (value.newAuction != null) {
+ INT_CODER.encode(1, outStream, Context.NESTED);
+ Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+ } else if (value.bid != null) {
+ INT_CODER.encode(2, outStream, Context.NESTED);
+ Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public Event decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ int tag = INT_CODER.decode(inStream, context);
+ if (tag == 0) {
+ Person person = Person.CODER.decode(inStream, Context.NESTED);
+ return new Event(person);
+ } else if (tag == 1) {
+ Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ return new Event(auction);
+ } else if (tag == 2) {
+ Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ return new Event(bid);
+ } else {
+ throw new RuntimeException("invalid event encoding");
+ }
+ }
+ };
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Person newPerson;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Auction newAuction;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Bid bid;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Event() {
+ newPerson = null;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Person newPerson) {
+ this.newPerson = newPerson;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Auction newAuction) {
+ newPerson = null;
+ this.newAuction = newAuction;
+ bid = null;
+ }
+
+ public Event(Bid bid) {
+ newPerson = null;
+ newAuction = null;
+ this.bid = bid;
+ }
+
+ /**
+ * Return a copy of event which captures {@code annotation}.
+ * (Used for debugging).
+ */
+ public Event withAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withAnnotation(annotation));
+ } else {
+ return new Event(bid.withAnnotation(annotation));
+ }
+ }
+
+ /**
+ * Does event have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ if (newPerson != null) {
+ return newPerson.hasAnnotation(annotation);
+ } else if (newAuction != null) {
+ return newAuction.hasAnnotation(annotation);
+ } else {
+ return bid.hasAnnotation(annotation);
+ }
+ }
+
+ /**
+ * Remove {@code annotation} from event. (Used for debugging.)
+ */
+ public Event withoutAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withoutAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withoutAnnotation(annotation));
+ } else {
+ return new Event(bid.withoutAnnotation(annotation));
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ if (newPerson != null) {
+ return 1 + newPerson.sizeInBytes();
+ } else if (newAuction != null) {
+ return 1 + newAuction.sizeInBytes();
+ } else if (bid != null) {
+ return 1 + bid.sizeInBytes();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (newPerson != null) {
+ return newPerson.toString();
+ } else if (newAuction != null) {
+ return newAuction.toString();
+ } else if (bid != null) {
+ return bid.toString();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Generator.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Generator.java
new file mode 100644
index 000000000000..1bb680dd0194
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Generator.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ *
This class implements {@link com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator>, Serializable {
+ /**
+ * Keep the number of categories small so the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final int NUM_CATEGORIES = 5;
+
+ /** Smallest random string size. */
+ private static final int MIN_STRING_LENGTH = 3;
+
+ /**
+ * Keep the number of states small so that the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final List US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+ private static final List US_CITIES =
+ Arrays.asList(
+ ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+ .split(","));
+
+ private static final List FIRST_NAMES =
+ Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+ private static final List LAST_NAMES =
+ Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int PERSON_ID_LEAD = 10;
+ private static final int AUCTION_ID_LEAD = 10;
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_AUCTION_RATIO = 100;
+ private static final int HOT_SELLER_RATIO = 100;
+ private static final int HOT_BIDDER_RATIO = 100;
+
+ /**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+ public static class Checkpoint implements UnboundedSource.CheckpointMark {
+ private static final Coder LONG_CODER = VarLongCoder.of();
+
+ /** Coder for this class. */
+ public static final Coder CODER_INSTANCE =
+ new AtomicCoder() {
+ @Override
+ public void encode(
+ Checkpoint value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Checkpoint decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
+ long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
+ return new Checkpoint(numEvents, wallclockBaseTime);
+ }
+ };
+
+ private long numEvents;
+ private long wallclockBaseTime;
+
+ private Checkpoint(long numEvents, long wallclockBaseTime) {
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ public Generator toGenerator(GeneratorConfig config) {
+ return new Generator(config, numEvents, wallclockBaseTime);
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ // Nothing to finalize.
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
+ numEvents, wallclockBaseTime);
+ }
+ }
+
+ /**
+ * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+ * (arbitrary but stable) event hash order.
+ */
+ public static class NextEvent implements Comparable {
+ /** When, in wallclock time, should this event be emitted? */
+ public final long wallclockTimestamp;
+
+ /** When, in event time, should this event be considered to have occured? */
+ public final long eventTimestamp;
+
+ /** The event itself. */
+ public final Event event;
+
+ /** The minimum of this and all future event timestamps. */
+ public final long watermark;
+
+ public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+ this.wallclockTimestamp = wallclockTimestamp;
+ this.eventTimestamp = eventTimestamp;
+ this.event = event;
+ this.watermark = watermark;
+ }
+
+ /**
+ * Return a deep clone of next event with delay added to wallclock timestamp and
+ * event annotate as 'LATE'.
+ */
+ public NextEvent withDelay(long delayMs) {
+ return new NextEvent(
+ wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+ }
+
+ @Override
+ public int compareTo(NextEvent other) {
+ int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+ if (i != 0) {
+ return i;
+ }
+ return Integer.compare(event.hashCode(), other.event.hashCode());
+ }
+ }
+
+ /**
+ * Configuration to generate events against. Note that it may be replaced by a call to
+ * {@link #splitAtEventId}.
+ */
+ private GeneratorConfig config;
+
+ /** Number of events generated by this generator. */
+ private long numEvents;
+
+ /**
+ * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+ */
+ private long wallclockBaseTime;
+
+ private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
+ Preconditions.checkNotNull(config);
+ this.config = config;
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ /**
+ * Create a fresh generator according to {@code config}.
+ */
+ public Generator(GeneratorConfig config) {
+ this(config, 0, -1);
+ }
+
+ /**
+ * Return a checkpoint for the current generator.
+ */
+ public Checkpoint toCheckpoint() {
+ return new Checkpoint(numEvents, wallclockBaseTime);
+ }
+
+ /**
+ * Return a deep clone of this generator.
+ */
+ @Override
+ public Generator clone() {
+ return new Generator(config.clone(), numEvents, wallclockBaseTime);
+ }
+
+ /**
+ * Return the current config for this generator. Note that configs may be replaced by {@link
+ * #splitAtEventId}.
+ */
+ public GeneratorConfig getCurrentConfig() {
+ return config;
+ }
+
+ /**
+ * Mutate this generator so that it will only generate events up to but not including
+ * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+ * The generators will run in on a serial timeline.
+ */
+ public GeneratorConfig splitAtEventId(long eventId) {
+ long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+ GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
+ config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+ config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+ return remainConfig;
+ }
+
+ /**
+ * Return the next 'event id'. Though events don't have ids we can simulate them to
+ * help with bookkeeping.
+ */
+ public long getNextEventId() {
+ return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
+ }
+
+ /**
+ * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+ * due to generate a person.
+ */
+ private long lastBase0PersonId() {
+ long eventId = getNextEventId();
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate an auction or bid.
+ // Go back to the last person generated in this epoch.
+ offset = GeneratorConfig.PERSON_PROPORTION - 1;
+ }
+ // About to generate a person.
+ return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+ }
+
+ /**
+ * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+ * due to generate an auction.
+ */
+ private long lastBase0AuctionId() {
+ long eventId = getNextEventId();
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset < GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate a person.
+ // Go back to the last auction in the last epoch.
+ epoch--;
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ // About to generate a bid.
+ // Go back to the last auction generated in this epoch.
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else {
+ // About to generate an auction.
+ offset -= GeneratorConfig.PERSON_PROPORTION;
+ }
+ return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+ }
+
+ /** return a random US state. */
+ private static String nextUSState(Random random) {
+ return US_STATES.get(random.nextInt(US_STATES.size()));
+ }
+
+ /** Return a random US city. */
+ private static String nextUSCity(Random random) {
+ return US_CITIES.get(random.nextInt(US_CITIES.size()));
+ }
+
+ /** Return a random person name. */
+ private static String nextPersonName(Random random) {
+ return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+ + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+ }
+
+ /** Return a random string of up to {@code maxLength}. */
+ private static String nextString(Random random, int maxLength) {
+ int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+ StringBuilder sb = new StringBuilder();
+ while (len-- > 0) {
+ if (random.nextInt(13) == 0) {
+ sb.append(' ');
+ } else {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /** Return a random string of exactly {@code length}. */
+ private static String nextExactString(Random random, int length) {
+ StringBuilder sb = new StringBuilder();
+ while (length-- > 0) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /** Return a random email address. */
+ private static String nextEmail(Random random) {
+ return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+ }
+
+ /** Return a random credit card number. */
+ private static String nextCreditCard(Random random) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ if (i > 0) {
+ sb.append(' ');
+ }
+ sb.append(String.format("%04d", random.nextInt(10000)));
+ }
+ return sb.toString();
+ }
+
+ /** Return a random price. */
+ private static long nextPrice(Random random) {
+ return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+ }
+
+ /** Return a random time delay, in milliseconds, for length of auctions. */
+ private long nextAuctionLengthMs(Random random, long timestamp) {
+ // What's our current event number?
+ long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
+ // How many events till we've generated numInFlightAuctions?
+ long numEventsForAuctions =
+ (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // When will the auction numInFlightAuctions beyond now be generated?
+ long futureAuction =
+ config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+ .getKey();
+ // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+ // futureAuction - timestamp, numEventsForAuctions);
+ // Choose a length with average horizonMs.
+ long horizonMs = futureAuction - timestamp;
+ return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+ }
+
+ /**
+ * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+ * {@code averageSize}.
+ */
+ private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+ if (currentSize > desiredAverageSize) {
+ return "";
+ }
+ desiredAverageSize -= currentSize;
+ int delta = (int) Math.round(desiredAverageSize * 0.2);
+ int minSize = desiredAverageSize - delta;
+ int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+ return nextExactString(random, desiredSize);
+ }
+
+ /** Return a random long from {@code [0, n)}. */
+ private static long nextLong(Random random, long n) {
+ if (n < Integer.MAX_VALUE) {
+ return random.nextInt((int) n);
+ } else {
+ // TODO: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong()) % n;
+ }
+ }
+
+ /**
+ * Generate and return a random person with next available id.
+ */
+ private Person nextPerson(Random random, long timestamp) {
+ long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
+ String name = nextPersonName(random);
+ String email = nextEmail(random);
+ String creditCard = nextCreditCard(random);
+ String city = nextUSCity(random);
+ String state = nextUSState(random);
+ int currentSize =
+ 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+ String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
+ return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+ }
+
+ /**
+ * Return a random person id (base 0).
+ */
+ private long nextBase0PersonId(Random random) {
+ // Choose a random person from any of the 'active' people, plus a few 'leads'.
+ // By limiting to 'active' we ensure the density of bids or auctions per person
+ // does not decrease over time for long running jobs.
+ // By choosing a person id ahead of the last valid person id we will make
+ // newPerson and newAuction events appear to have been swapped in time.
+ long numPeople = lastBase0PersonId() + 1;
+ long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
+ long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+ return numPeople - activePeople + n;
+ }
+
+ /**
+ * Return a random auction id (base 0).
+ */
+ private long nextBase0AuctionId(Random random) {
+ // Choose a random auction for any of those which are likely to still be in flight,
+ // plus a few 'leads'.
+ // Note that ideally we'd track non-expired auctions exactly, but that state
+ // is difficult to split.
+ long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
+ long maxAuction = lastBase0AuctionId();
+ return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+ }
+
+ /**
+ * Generate and return a random auction with next available id.
+ */
+ private Auction nextAuction(Random random, long timestamp) {
+ long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
+
+ long seller;
+ // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+ if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
+ // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+ seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ } else {
+ seller = nextBase0PersonId(random);
+ }
+ seller += GeneratorConfig.FIRST_PERSON_ID;
+
+ long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+ long initialBid = nextPrice(random);
+ long dateTime = timestamp;
+ long expires = timestamp + nextAuctionLengthMs(random, timestamp);
+ String name = nextString(random, 20);
+ String desc = nextString(random, 100);
+ long reserve = initialBid + nextPrice(random);
+ int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
+ return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
+ extra);
+ }
+
+ /**
+ * Generate and return a random bid with next available id.
+ */
+ private Bid nextBid(Random random, long timestamp) {
+ long auction;
+ // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+ if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
+ // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+ auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+ } else {
+ auction = nextBase0AuctionId(random);
+ }
+ auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+ long bidder;
+ // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+ if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
+ // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+ // last HOT_BIDDER_RATIO people.
+ bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+ } else {
+ bidder = nextBase0PersonId(random);
+ }
+ bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+ long price = nextPrice(random);
+ int currentSize = 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
+ return new Bid(auction, bidder, price, timestamp, extra);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numEvents < config.maxEvents;
+ }
+
+ /**
+ * Return the next event. The outer timestamp is in wallclock time and corresponds to
+ * when the event should fire. The inner timestamp is in event-time and represents the
+ * time the event is purported to have taken place in the simulation.
+ */
+ public NextEvent nextEvent() {
+ if (wallclockBaseTime < 0) {
+ wallclockBaseTime = System.currentTimeMillis();
+ }
+ // When, in event time, we should generate the event. Monotonic.
+ long eventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
+ // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+ // may have local jitter.
+ long adjustedEventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
+ .getKey();
+ // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+ // the event timestamp.
+ long watermark =
+ config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
+ .getKey();
+ // When, in wallclock time, we should emit the event.
+ long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+ // Seed the random number generator with the next 'event id'.
+ Random random = new Random(getNextEventId());
+ long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+ Event event;
+ if (rem < GeneratorConfig.PERSON_PROPORTION) {
+ event = new Event(nextPerson(random, adjustedEventTimestamp));
+ } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ event = new Event(nextAuction(random, adjustedEventTimestamp));
+ } else {
+ event = new Event(nextBid(random, adjustedEventTimestamp));
+ }
+
+ numEvents++;
+ return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+ }
+
+ @Override
+ public TimestampedValue next() {
+ NextEvent next = nextEvent();
+ return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Return how many microseconds till we emit the next event.
+ */
+ public long currentInterEventDelayUs() {
+ return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
+ .getValue();
+ }
+
+ /**
+ * Return an estimate of fraction of output consumed.
+ */
+ public double getFractionConsumed() {
+ return (double) numEvents / config.maxEvents;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
+ numEvents, wallclockBaseTime);
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/GeneratorConfig.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/GeneratorConfig.java
new file mode 100644
index 000000000000..33635b71d313
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/GeneratorConfig.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+class GeneratorConfig implements Serializable {
+ /**
+ * We start the ids at specific values to help ensure the queries find a match even on
+ * small synthesized dataset sizes.
+ */
+ public static final long FIRST_AUCTION_ID = 1000L;
+ public static final long FIRST_PERSON_ID = 1000L;
+ public static final long FIRST_CATEGORY_ID = 10L;
+
+ /**
+ * Proportions of people/auctions/bids to synthesize.
+ */
+ public static final int PERSON_PROPORTION = 1;
+ public static final int AUCTION_PROPORTION = 3;
+ public static final int BID_PROPORTION = 46;
+ public static final int PROPORTION_DENOMINATOR =
+ PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+ /** Environment options. */
+ public final NexmarkConfiguration configuration;
+
+ /**
+ * Delay between events, in microseconds. If the array has more than one entry then
+ * the rate is changed every {@link #stepLengthSec}, and wraps around.
+ */
+ public final long[] interEventDelayUs;
+
+ /**
+ * Delay before changing the current inter-event delay.
+ */
+ public final long stepLengthSec;
+
+ /**
+ * Time for first event (ms since epoch).
+ */
+ public final long baseTime;
+
+ /**
+ * Event id of first event to be generated. Event ids are unique over all generators, and
+ * are used as a seed to generate each event's data.
+ */
+ public final long firstEventId;
+
+ /** Maximum number of events to generate. */
+ public final long maxEvents;
+
+ /**
+ * First event number. Generators running in parallel time may share the same event number,
+ * and the event number is used to determine the event timestamp.
+ */
+ public final long firstEventNumber;
+
+ /**
+ * True period of epoch in milliseconds. Derived from above.
+ * (Ie time to run through cycle for all interEventDelayUs entries).
+ */
+ public final long epochPeriodMs;
+
+ /**
+ * Number of events per epoch. Derived from above.
+ * (Ie number of events to run through cycle for all interEventDelayUs entries).
+ */
+ public final long eventsPerEpoch;
+
+ public GeneratorConfig(NexmarkConfiguration configuration, long baseTime, long firstEventId,
+ long maxEventsOrZero, long firstEventNumber) {
+ this.configuration = configuration;
+ this.interEventDelayUs = configuration.qpsShape.interEventDelayUs(
+ configuration.firstEventQps, configuration.nextEventQps, configuration.numEventGenerators);
+ this.stepLengthSec = configuration.qpsShape.stepLengthSec(configuration.qpsPeriodSec);
+ this.baseTime = baseTime;
+ this.firstEventId = firstEventId;
+ if (maxEventsOrZero == 0) {
+ // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+ this.maxEvents =
+ Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+ * Math.max(
+ Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+ configuration.avgBidByteSize));
+ } else {
+ this.maxEvents = maxEventsOrZero;
+ }
+ this.firstEventNumber = firstEventNumber;
+
+ long eventsPerEpoch = 0;
+ long epochPeriodMs = 0;
+ if (interEventDelayUs.length > 1) {
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+ eventsPerEpoch += numEventsForThisCycle;
+ epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+ }
+ }
+ this.eventsPerEpoch = eventsPerEpoch;
+ this.epochPeriodMs = epochPeriodMs;
+ }
+
+ /**
+ * Return a clone of this config.
+ */
+ @Override
+ public GeneratorConfig clone() {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
+ * Return clone of this config except with given parameters.
+ */
+ public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
+ * Split this config into {@code n} sub-configs with roughly equal number of
+ * possible events, but distinct value spaces. The generators will run on parallel timelines.
+ * This config should no longer be used.
+ */
+ public List split(int n) {
+ List results = new ArrayList<>();
+ if (n == 1) {
+ // No split required.
+ results.add(this);
+ } else {
+ long subMaxEvents = maxEvents / n;
+ long subFirstEventId = firstEventId;
+ for (int i = 0; i < n; i++) {
+ if (i == n - 1) {
+ // Don't loose any events to round-down.
+ subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+ }
+ results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
+ subFirstEventId += subMaxEvents;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Return an estimate of the bytes needed by {@code numEvents}.
+ */
+ public long estimatedBytesForEvents(long numEvents) {
+ long numPersons =
+ (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+ long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+ return numPersons * configuration.avgPersonByteSize
+ + numAuctions * configuration.avgAuctionByteSize + numBids * configuration.avgBidByteSize;
+ }
+
+ /**
+ * Return an estimate of the byte-size of all events a generator for this config would yield.
+ */
+ public long getEstimatedSizeBytes() {
+ return estimatedBytesForEvents(maxEvents);
+ }
+
+ /**
+ * Return the first 'event id' which could be generated from this config. Though events don't
+ * have ids we can simulate them to help bookkeeping.
+ */
+ public long getStartEventId() {
+ return firstEventId + firstEventNumber;
+ }
+
+ /**
+ * Return one past the last 'event id' which could be generated from this config.
+ */
+ public long getStopEventId() {
+ return firstEventId + firstEventNumber + maxEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumber(long numEvents) {
+ return firstEventNumber + numEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents},
+ * but adjusted to account for {@code outOfOrderGroupSize}.
+ */
+ public long nextAdjustedEventNumber(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ long base = (eventNumber / n) * n;
+ long offset = (eventNumber * 953) % n;
+ return base + offset;
+ }
+
+ /**
+ * Return the event number who's event time will be a suitable watermark for
+ * a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumberForWatermark(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ return (eventNumber / n) * n;
+ }
+
+ /**
+ * What timestamp should the event with {@code eventNumber} have for this generator? And
+ * what inter-event delay (in microseconds) is current?
+ */
+ public KV timestampAndInterEventDelayUsForEvent(long eventNumber) {
+ if (interEventDelayUs.length == 1) {
+ long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+ return KV.of(timestamp, interEventDelayUs[0]);
+ }
+
+ long epoch = eventNumber / eventsPerEpoch;
+ long n = eventNumber % eventsPerEpoch;
+ long offsetInEpochMs = 0;
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+ if (n < numEventsForThisCycle) {
+ long offsetInCycleUs = n * interEventDelayUs[i];
+ long timestamp =
+ baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+ return KV.of(timestamp, interEventDelayUs[i]);
+ }
+ n -= numEventsForThisCycle;
+ offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+ }
+ throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+ }
+
+ @Override
+ public String toString() {
+ return String.format("GeneratorConfig{configuration:%s, baseTime:%d, firstEventId:%d, "
+ + "maxEvents:%d, firstEventNumber:%d, epochPeriodMs:%d, "
+ + "eventsPerEpoch:%d}",
+ configuration, baseTime, firstEventId, maxEvents, firstEventNumber, epochPeriodMs,
+ eventsPerEpoch);
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/IdNameReserve.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/IdNameReserve.java
new file mode 100644
index 000000000000..4049399c57f4
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/IdNameReserve.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result type of {@link Query8}.
+ */
+public class IdNameReserve implements KnownSize, Serializable {
+ private static final Coder LONG_CODER = VarLongCoder.of();
+ private static final Coder STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder CODER = new AtomicCoder() {
+ @Override
+ public void encode(IdNameReserve value, OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+ }
+
+ @Override
+ public IdNameReserve decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+ return new IdNameReserve(id, name, reserve);
+ }
+ };
+
+ @JsonProperty
+ public final long id;
+
+ @JsonProperty
+ public final String name;
+
+ /** Reserve price in cents. */
+ @JsonProperty
+ public final long reserve;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private IdNameReserve() {
+ id = 0;
+ name = null;
+ reserve = 0;
+ }
+
+ public IdNameReserve(long id, String name, long reserve) {
+ this.id = id;
+ this.name = name;
+ this.reserve = reserve;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + name.length() + 1 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/KnownSize.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/KnownSize.java
new file mode 100644
index 000000000000..81b9a879f5b4
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/KnownSize.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+/**
+ * Interface for elements which can quickly estimate their encoded byte size.
+ */
+public interface KnownSize {
+ long sizeInBytes();
+}
+
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Monitor.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Monitor.java
new file mode 100644
index 000000000000..d3a17ad40fdc
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/Monitor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max.MaxLongFn;
+import com.google.cloud.dataflow.sdk.transforms.Min.MinLongFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum.SumLongFn;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.io.Serializable;
+
+/**
+ * A monitor of elements with support for later retrieving their aggregators.
+ *
+ * @param Type of element we are monitoring.
+ */
+public class Monitor implements Serializable {
+ private class MonitorDoFn extends DoFn {
+ public final Aggregator elementCounter =
+ createAggregator(counterNamePrefix + "_elements", new SumLongFn());
+ public final Aggregator bytesCounter =
+ createAggregator(counterNamePrefix + "_bytes", new SumLongFn());
+ public final Aggregator startTime =
+ createAggregator(counterNamePrefix + "_startTime", new MinLongFn());
+ public final Aggregator endTime =
+ createAggregator(counterNamePrefix + "_endTime", new MaxLongFn());
+ public final Aggregator startTimestamp =
+ createAggregator("startTimestamp", new MinLongFn());
+ public final Aggregator endTimestamp =
+ createAggregator("endTimestamp", new MaxLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ elementCounter.addValue(1L);
+ bytesCounter.addValue(c.element().sizeInBytes());
+ long now = System.currentTimeMillis();
+ startTime.addValue(now);
+ endTime.addValue(now);
+ startTimestamp.addValue(c.timestamp().getMillis());
+ endTimestamp.addValue(c.timestamp().getMillis());
+ c.output(c.element());
+ }
+ }
+
+ final MonitorDoFn doFn;
+ final PTransform, PCollection> transform;
+ private String counterNamePrefix;
+
+ public Monitor(String name, String counterNamePrefix) {
+ this.counterNamePrefix = counterNamePrefix;
+ doFn = new MonitorDoFn();
+ transform = ParDo.named(name + ".Monitor").of(doFn);
+ }
+
+ public PTransform, PCollection> getTransform() {
+ return transform;
+ }
+
+ public Aggregator getElementCounter() {
+ return doFn.elementCounter;
+ }
+
+ public Aggregator getBytesCounter() {
+ return doFn.bytesCounter;
+ }
+
+ public Aggregator getStartTime() {
+ return doFn.startTime;
+ }
+
+ public Aggregator getEndTime() {
+ return doFn.endTime;
+ }
+
+ public Aggregator getStartTimestamp() {
+ return doFn.startTimestamp;
+ }
+
+ public Aggregator getEndTimestamp() {
+ return doFn.endTimestamp;
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NameCityStateId.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NameCityStateId.java
new file mode 100644
index 000000000000..c16473c7ff85
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NameCityStateId.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query3}.
+ */
+public class NameCityStateId implements KnownSize, Serializable {
+ private static final Coder LONG_CODER = VarLongCoder.of();
+ private static final Coder STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder CODER = new AtomicCoder() {
+ @Override
+ public void encode(NameCityStateId value, OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ STRING_CODER.encode(value.city, outStream, Context.NESTED);
+ STRING_CODER.encode(value.state, outStream, Context.NESTED);
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ }
+
+ @Override
+ public NameCityStateId decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ String city = STRING_CODER.decode(inStream, Context.NESTED);
+ String state = STRING_CODER.decode(inStream, Context.NESTED);
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ return new NameCityStateId(name, city, state, id);
+ }
+ };
+
+ @JsonProperty
+ public final String name;
+
+ @JsonProperty
+ public final String city;
+
+ @JsonProperty
+ public final String state;
+
+ @JsonProperty
+ public final long id;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private NameCityStateId() {
+ name = null;
+ city = null;
+ state = null;
+ id = 0;
+ }
+
+ public NameCityStateId(String name, String city, String state, long id) {
+ this.name = name;
+ this.city = city;
+ this.state = state;
+ this.id = id;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkConfiguration.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkConfiguration.java
new file mode 100644
index 000000000000..0ab230f65fcd
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkConfiguration.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Configuration controlling how a query is run. May be supplied by command line or
+ * programmatically. Some properties override those supplied by {@link DataflowPipelineOptions} (eg
+ * {@code isStreaming}). We try to capture everything which may influence the resulting
+ * pipeline performance, as captured by {@link NexmarkPerf}.
+ */
+class NexmarkConfiguration implements Serializable {
+ public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
+
+ /** Which query to run, in [0,9]. */
+ @JsonProperty
+ public int query = 0;
+
+ /** If true, emit query result as ERROR log entries. */
+ @JsonProperty
+ public boolean logResults = false;
+
+ /**
+ * If true, use {@link DataflowAssert} to assert that query results match a hand-written
+ * model. Only works if {@link #numEvents} is small.
+ */
+ @JsonProperty
+ public boolean assertCorrectness = false;
+
+ /** Where events come from. */
+ @JsonProperty
+ public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
+
+ /** Where results go to. */
+ @JsonProperty
+ public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
+
+ /**
+ * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
+ * into the overall query pipeline.
+ */
+ @JsonProperty
+ public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
+
+ /**
+ * Number of events to generate. If zero, generate as many as possible without overflowing
+ * internal counters etc.
+ */
+ @JsonProperty
+ public long numEvents = 100000;
+
+ /**
+ * Number of event generators to use. Each generates events in its own timeline.
+ */
+ @JsonProperty
+ public int numEventGenerators = 100;
+
+ /**
+ * Shape of event qps curve.
+ */
+ @JsonProperty
+ public NexmarkUtils.QpsShape qpsShape = NexmarkUtils.QpsShape.SINE;
+
+ /**
+ * Initial overall event qps.
+ */
+ @JsonProperty
+ public int firstEventQps = 10000;
+
+ /**
+ * Next overall event qps.
+ */
+ @JsonProperty
+ public int nextEventQps = 10000;
+
+ /**
+ * Overall period of qps shape, in seconds.
+ */
+ @JsonProperty
+ public int qpsPeriodSec = 600;
+
+ /**
+ * Time in seconds to preload the subscription with data, at the initial input rate of the
+ * pipeline.
+ */
+ @JsonProperty
+ public int preloadSeconds = 0;
+
+ /**
+ * If true, and in streaming mode, generate events only when they are due according to their
+ * timestamp.
+ */
+ @JsonProperty
+ public boolean isRateLimited = false;
+
+ /**
+ * If true, use wallclock time as event time. Otherwise, use a deterministic
+ * time in the past so that multiple runs will see exactly the same event streams
+ * and should thus have exactly the same results.
+ */
+ @JsonProperty
+ public boolean useWallclockEventTime = false;
+
+ /** Average idealized size of a 'new person' event, in bytes. */
+ @JsonProperty
+ public int avgPersonByteSize = 200;
+
+ /** Average idealized size of a 'new auction' event, in bytes. */
+ @JsonProperty
+ public int avgAuctionByteSize = 500;
+
+ /** Average idealized size of a 'bid' event, in bytes. */
+ @JsonProperty
+ public int avgBidByteSize = 100;
+
+ /** Ratio of bids to 'hot' auctions compared to all other auctions. */
+ @JsonProperty
+ public int hotAuctionRatio = 1;
+
+ /** Ratio of auctions for 'hot' sellers compared to all other people. */
+ @JsonProperty
+ public int hotSellersRatio = 1;
+
+ /** Ratio of bids for 'hot' bidders compared to all other people. */
+ @JsonProperty
+ public int hotBiddersRatio = 1;
+
+ /** Window size, in seconds, for queries 3, 5, 7 and 8. */
+ @JsonProperty
+ public long windowSizeSec = 10;
+
+ /** Sliding window period, in seconds, for query 5. */
+ @JsonProperty
+ public long windowPeriodSec = 5;
+
+ /** Number of seconds to hold back events according to their reported timestamp. */
+ @JsonProperty
+ public long watermarkHoldbackSec = 0;
+
+ /** Average number of auction which should be inflight at any time, per generator. */
+ @JsonProperty
+ public int numInFlightAuctions = 100;
+
+ /** Maximum number of people to consider as active for placing auctions or bids. */
+ @JsonProperty
+ public int numActivePeople = 1000;
+
+ /**
+ * If true, don't run the actual query. Instead, calculate the distribution
+ * of number of query results per (event time) minute according to the query model.
+ */
+ @JsonProperty
+ public boolean justModelResultRate = false;
+
+ /** Coder strategy to follow. */
+ @JsonProperty
+ public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
+
+ /**
+ * Delay, in milliseconds, for each event. This will peg one core for this number
+ * of milliseconds to simulate CPU-bound computation.
+ */
+ @JsonProperty
+ public long cpuDelayMs = 0;
+
+ /**
+ * Extra data, in bytes, to save to persistent state for each event. This will force
+ * i/o all the way to durable storage to simulate an I/O-bound computation.
+ */
+ @JsonProperty
+ public long diskBusyBytes = 0;
+
+ /**
+ * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
+ */
+ @JsonProperty
+ public int auctionSkip = 123;
+
+ /**
+ * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
+ */
+ @JsonProperty
+ public int fanout = 5;
+
+ /**
+ * Length of occasional delay to impose on events (in seconds).
+ */
+ @JsonProperty
+ public long occasionalDelaySec = 0;
+
+ /**
+ * Probability that an event will be delayed by delayS.
+ */
+ @JsonProperty
+ public double probDelayedEvent = 0.0;
+
+ /**
+ * Maximum size of each log file (in events). For Query10 only.
+ */
+ @JsonProperty
+ public int maxLogEvents = 100_000;
+
+ /**
+ * If true, use pub/sub publish time instead of event time.
+ */
+ @JsonProperty
+ public boolean usePubsubPublishTime = false;
+
+ /**
+ * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
+ * every 1000 events per generator are emitted in pseudo-random order.
+ */
+ @JsonProperty
+ public long outOfOrderGroupSize = 1;
+
+ /**
+ * In debug mode, include Monitor & Snoop.
+ */
+ @JsonProperty
+ public boolean debug = true;
+
+ /**
+ * Replace any properties of this configuration which have been supplied by the command line.
+ * However *never replace* isStreaming since we can't tell if it was supplied by the command line
+ * or merely has its default false value.
+ */
+ public void overrideFromOptions(Options options) {
+ if (options.getQuery() != null) {
+ query = options.getQuery();
+ }
+ if (options.getLogResults() != null) {
+ logResults = options.getLogResults();
+ }
+ if (options.getAssertCorrectness() != null) {
+ assertCorrectness = options.getAssertCorrectness();
+ }
+ if (options.getSourceType() != null) {
+ sourceType = options.getSourceType();
+ }
+ if (options.getSinkType() != null) {
+ sinkType = options.getSinkType();
+ }
+ if (options.getPubSubMode() != null) {
+ pubSubMode = options.getPubSubMode();
+ }
+ if (options.getNumEvents() != null) {
+ numEvents = options.getNumEvents();
+ }
+ if (options.getNumEventGenerators() != null) {
+ numEventGenerators = options.getNumEventGenerators();
+ }
+ if (options.getQpsShape() != null) {
+ qpsShape = options.getQpsShape();
+ }
+ if (options.getFirstEventQps() != null) {
+ firstEventQps = options.getFirstEventQps();
+ }
+ if (options.getNextEventQps() != null) {
+ nextEventQps = options.getNextEventQps();
+ }
+ if (options.getQpsPeriodSec() != null) {
+ qpsPeriodSec = options.getQpsPeriodSec();
+ }
+ if (options.getPreloadSeconds() != null) {
+ preloadSeconds = options.getPreloadSeconds();
+ }
+ if (options.getIsRateLimited() != null) {
+ isRateLimited = options.getIsRateLimited();
+ }
+ if (options.getUseWallclockEventTime() != null) {
+ useWallclockEventTime = options.getUseWallclockEventTime();
+ }
+ if (options.getAvgPersonByteSize() != null) {
+ avgPersonByteSize = options.getAvgPersonByteSize();
+ }
+ if (options.getAvgAuctionByteSize() != null) {
+ avgAuctionByteSize = options.getAvgAuctionByteSize();
+ }
+ if (options.getAvgBidByteSize() != null) {
+ avgBidByteSize = options.getAvgBidByteSize();
+ }
+ if (options.getHotAuctionRatio() != null) {
+ hotAuctionRatio = options.getHotAuctionRatio();
+ }
+ if (options.getHotSellersRatio() != null) {
+ hotSellersRatio = options.getHotSellersRatio();
+ }
+ if (options.getHotBiddersRatio() != null) {
+ hotBiddersRatio = options.getHotBiddersRatio();
+ }
+ if (options.getWindowSizeSec() != null) {
+ windowSizeSec = options.getWindowSizeSec();
+ }
+ if (options.getWindowPeriodSec() != null) {
+ windowPeriodSec = options.getWindowPeriodSec();
+ }
+ if (options.getWatermarkHoldbackSec() != null) {
+ watermarkHoldbackSec = options.getWatermarkHoldbackSec();
+ }
+ if (options.getNumInFlightAuctions() != null) {
+ numInFlightAuctions = options.getNumInFlightAuctions();
+ }
+ if (options.getNumActivePeople() != null) {
+ numActivePeople = options.getNumActivePeople();
+ }
+ if (options.getJustModelResultRate() != null) {
+ justModelResultRate = options.getJustModelResultRate();
+ }
+ if (options.getCoderStrategy() != null) {
+ coderStrategy = options.getCoderStrategy();
+ }
+ if (options.getCpuDelayMs() != null) {
+ cpuDelayMs = options.getCpuDelayMs();
+ }
+ if (options.getDiskBusyBytes() != null) {
+ diskBusyBytes = options.getDiskBusyBytes();
+ }
+ if (options.getAuctionSkip() != null) {
+ auctionSkip = options.getAuctionSkip();
+ }
+ if (options.getFanout() != null) {
+ fanout = options.getFanout();
+ }
+ if (options.getOccasionalDelaySec() != null) {
+ occasionalDelaySec = options.getOccasionalDelaySec();
+ }
+ if (options.getProbDelayedEvent() != null) {
+ probDelayedEvent = options.getProbDelayedEvent();
+ }
+ if (options.getMaxLogEvents() != null) {
+ maxLogEvents = options.getMaxLogEvents();
+ }
+ if (options.getUsePubsubPublishTime() != null) {
+ usePubsubPublishTime = options.getUsePubsubPublishTime();
+ }
+ if (options.getOutOfOrderGroupSize() != null) {
+ outOfOrderGroupSize = options.getOutOfOrderGroupSize();
+ }
+ if (options.getDebug() != null) {
+ debug = options.getDebug();
+ }
+ }
+
+ /**
+ * Return clone of configuration with given label.
+ */
+ @Override
+ public NexmarkConfiguration clone() {
+ NexmarkConfiguration result = new NexmarkConfiguration();
+ result.query = query;
+ result.logResults = logResults;
+ result.assertCorrectness = assertCorrectness;
+ result.sourceType = sourceType;
+ result.sinkType = sinkType;
+ result.pubSubMode = pubSubMode;
+ result.numEvents = numEvents;
+ result.numEventGenerators = numEventGenerators;
+ result.qpsShape = qpsShape;
+ result.firstEventQps = firstEventQps;
+ result.nextEventQps = nextEventQps;
+ result.qpsPeriodSec = qpsPeriodSec;
+ result.preloadSeconds = preloadSeconds;
+ result.isRateLimited = isRateLimited;
+ result.useWallclockEventTime = useWallclockEventTime;
+ result.avgPersonByteSize = avgPersonByteSize;
+ result.avgAuctionByteSize = avgAuctionByteSize;
+ result.avgBidByteSize = avgBidByteSize;
+ result.hotAuctionRatio = hotAuctionRatio;
+ result.hotSellersRatio = hotSellersRatio;
+ result.hotBiddersRatio = hotBiddersRatio;
+ result.windowSizeSec = windowSizeSec;
+ result.windowPeriodSec = windowPeriodSec;
+ result.watermarkHoldbackSec = watermarkHoldbackSec;
+ result.numInFlightAuctions = numInFlightAuctions;
+ result.numActivePeople = numActivePeople;
+ result.justModelResultRate = justModelResultRate;
+ result.coderStrategy = coderStrategy;
+ result.cpuDelayMs = cpuDelayMs;
+ result.diskBusyBytes = diskBusyBytes;
+ result.auctionSkip = auctionSkip;
+ result.fanout = fanout;
+ result.occasionalDelaySec = occasionalDelaySec;
+ result.probDelayedEvent = probDelayedEvent;
+ result.maxLogEvents = maxLogEvents;
+ result.usePubsubPublishTime = usePubsubPublishTime;
+ result.outOfOrderGroupSize = outOfOrderGroupSize;
+ return result;
+ }
+
+ /**
+ * Return short description of configuration (suitable for use in logging). We only render
+ * the core fields plus those which do not have default values.
+ */
+ public String toShortString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("query:%d", query));
+ if (sourceType != DEFAULT.sourceType) {
+ sb.append(String.format("; sourceType:%s", sourceType));
+ }
+ if (sinkType != DEFAULT.sinkType) {
+ sb.append(String.format("; sinkType:%s", sinkType));
+ }
+ if (pubSubMode != DEFAULT.pubSubMode) {
+ sb.append(String.format("; pubSubMode:%s", pubSubMode));
+ }
+ if (numEvents != DEFAULT.numEvents) {
+ sb.append(String.format("; numEvents:%d", numEvents));
+ }
+ if (numEventGenerators != DEFAULT.numEventGenerators) {
+ sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
+ }
+ if (qpsShape != DEFAULT.qpsShape) {
+ sb.append(String.format("; qpsShare:%s", qpsShape));
+ }
+ if (firstEventQps != DEFAULT.firstEventQps || nextEventQps != DEFAULT.nextEventQps) {
+ sb.append(String.format("; firstEventQps:%d", firstEventQps));
+ sb.append(String.format("; nextEventQps:%d", nextEventQps));
+ }
+ if (qpsPeriodSec != DEFAULT.qpsPeriodSec) {
+ sb.append(String.format("; qpsPeriodSec:%d", qpsPeriodSec));
+ }
+ if (preloadSeconds != DEFAULT.preloadSeconds) {
+ sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
+ }
+ if (isRateLimited != DEFAULT.isRateLimited) {
+ sb.append(String.format("; isRateLimited:%s", isRateLimited));
+ }
+ if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
+ sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
+ }
+ if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
+ sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
+ }
+ if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
+ sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
+ }
+ if (avgBidByteSize != DEFAULT.avgBidByteSize) {
+ sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
+ }
+ if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
+ sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
+ }
+ if (hotSellersRatio != DEFAULT.hotSellersRatio) {
+ sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
+ }
+ if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
+ sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
+ }
+ if (windowSizeSec != DEFAULT.windowSizeSec) {
+ sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
+ }
+ if (windowPeriodSec != DEFAULT.windowPeriodSec) {
+ sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
+ }
+ if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
+ sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
+ }
+ if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
+ sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
+ }
+ if (numActivePeople != DEFAULT.numActivePeople) {
+ sb.append(String.format("; numActivePeople:%d", numActivePeople));
+ }
+ if (justModelResultRate != DEFAULT.justModelResultRate) {
+ sb.append(String.format("; justModelResutRate:%s", justModelResultRate));
+ }
+ if (coderStrategy != DEFAULT.coderStrategy) {
+ sb.append(String.format("; coderStrategy:%s", coderStrategy));
+ }
+ if (cpuDelayMs != DEFAULT.cpuDelayMs) {
+ sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
+ }
+ if (diskBusyBytes != DEFAULT.diskBusyBytes) {
+ sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
+ }
+ if (auctionSkip != DEFAULT.auctionSkip) {
+ sb.append(String.format("; auctionSkip:%d", auctionSkip));
+ }
+ if (fanout != DEFAULT.fanout) {
+ sb.append(String.format("; fanout:%d", fanout));
+ }
+ if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
+ sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
+ }
+ if (probDelayedEvent != DEFAULT.probDelayedEvent) {
+ sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
+ }
+ if (maxLogEvents != DEFAULT.maxLogEvents) {
+ sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
+ }
+ if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
+ sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
+ }
+ if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
+ sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return full description as a string.
+ */
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse an object from {@code string}.
+ *
+ * @throws IOException
+ */
+ public static NexmarkConfiguration fromString(String string) {
+ try {
+ return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse nexmark configuration: ", e);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(query, logResults, assertCorrectness, sourceType, sinkType, pubSubMode,
+ numEvents, numEventGenerators, qpsShape, firstEventQps, nextEventQps,
+ qpsPeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
+ avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
+ windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
+ justModelResultRate, coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout,
+ occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
+ outOfOrderGroupSize);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ NexmarkConfiguration other = (NexmarkConfiguration) obj;
+ if (assertCorrectness != other.assertCorrectness) {
+ return false;
+ }
+ if (auctionSkip != other.auctionSkip) {
+ return false;
+ }
+ if (avgAuctionByteSize != other.avgAuctionByteSize) {
+ return false;
+ }
+ if (avgBidByteSize != other.avgBidByteSize) {
+ return false;
+ }
+ if (avgPersonByteSize != other.avgPersonByteSize) {
+ return false;
+ }
+ if (coderStrategy != other.coderStrategy) {
+ return false;
+ }
+ if (cpuDelayMs != other.cpuDelayMs) {
+ return false;
+ }
+ if (diskBusyBytes != other.diskBusyBytes) {
+ return false;
+ }
+ if (fanout != other.fanout) {
+ return false;
+ }
+ if (firstEventQps != other.firstEventQps) {
+ return false;
+ }
+ if (hotAuctionRatio != other.hotAuctionRatio) {
+ return false;
+ }
+ if (hotBiddersRatio != other.hotBiddersRatio) {
+ return false;
+ }
+ if (hotSellersRatio != other.hotSellersRatio) {
+ return false;
+ }
+ if (isRateLimited != other.isRateLimited) {
+ return false;
+ }
+ if (justModelResultRate != other.justModelResultRate) {
+ return false;
+ }
+ if (logResults != other.logResults) {
+ return false;
+ }
+ if (maxLogEvents != other.maxLogEvents) {
+ return false;
+ }
+ if (nextEventQps != other.nextEventQps) {
+ return false;
+ }
+ if (numEventGenerators != other.numEventGenerators) {
+ return false;
+ }
+ if (numEvents != other.numEvents) {
+ return false;
+ }
+ if (numInFlightAuctions != other.numInFlightAuctions) {
+ return false;
+ }
+ if (numActivePeople != other.numActivePeople) {
+ return false;
+ }
+ if (occasionalDelaySec != other.occasionalDelaySec) {
+ return false;
+ }
+ if (preloadSeconds != other.preloadSeconds) {
+ return false;
+ }
+ if (Double.doubleToLongBits(probDelayedEvent)
+ != Double.doubleToLongBits(other.probDelayedEvent)) {
+ return false;
+ }
+ if (pubSubMode != other.pubSubMode) {
+ return false;
+ }
+ if (qpsPeriodSec != other.qpsPeriodSec) {
+ return false;
+ }
+ if (qpsShape != other.qpsShape) {
+ return false;
+ }
+ if (query != other.query) {
+ return false;
+ }
+ if (sinkType != other.sinkType) {
+ return false;
+ }
+ if (sourceType != other.sourceType) {
+ return false;
+ }
+ if (useWallclockEventTime != other.useWallclockEventTime) {
+ return false;
+ }
+ if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
+ return false;
+ }
+ if (windowPeriodSec != other.windowPeriodSec) {
+ return false;
+ }
+ if (windowSizeSec != other.windowSizeSec) {
+ return false;
+ }
+ if (usePubsubPublishTime != other.usePubsubPublishTime) {
+ return false;
+ }
+ if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkDriver.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkDriver.java
new file mode 100644
index 000000000000..b1c9d82b971c
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkDriver.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are 11 queries over a three table schema representing on online auction system:
+ *
+ * - {@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ *
- {@link Auction} represents an item under auction.
+ *
- {@link Bid} represents a bid for an item under auction.
+ *
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ *
We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ *
See
+ *
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/
+ */
+public class NexmarkDriver {
+
+ /**
+ * Entry point.
+ */
+ public void runAll(OptionT options, NexmarkRunner runner) {
+ Instant start = Instant.now();
+ Map baseline = loadBaseline(options.getBaselineFilename());
+ Map actual = new LinkedHashMap<>();
+ Iterable configurations = options.getSuite().getConfigurations(options);
+
+ boolean successful = true;
+ try {
+ // Run all the configurations.
+ for (NexmarkConfiguration configuration : configurations) {
+ NexmarkPerf perf = runner.run(configuration);
+ if (perf != null) {
+ if (perf.errors == null || perf.errors.size() > 0) {
+ successful = false;
+ }
+ appendPerf(options.getPerfFilename(), configuration, perf);
+ actual.put(configuration, perf);
+ // Summarize what we've run so far.
+ saveSummary(null, configurations, actual, baseline, start);
+ }
+ }
+ } finally {
+ if (options.getMonitorJobs()) {
+ // Report overall performance.
+ saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+ saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+ }
+ }
+
+ if (!successful) {
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Append the pair of {@code configuration} and {@code perf} to perf file.
+ */
+ private void appendPerf(
+ @Nullable String perfFilename, NexmarkConfiguration configuration,
+ NexmarkPerf perf) {
+ if (perfFilename == null) {
+ return;
+ }
+ List lines = new ArrayList<>();
+ lines.add("");
+ lines.add(String.format("# %s", Instant.now()));
+ lines.add(String.format("# %s", configuration.toShortString()));
+ lines.add(configuration.toString());
+ lines.add(perf.toString());
+ try {
+ Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+ StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to write perf file: ", e);
+ }
+ NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+ }
+
+ /**
+ * Load the baseline perf.
+ */
+ @Nullable
+ private static Map loadBaseline(
+ @Nullable String baselineFilename) {
+ if (baselineFilename == null) {
+ return null;
+ }
+ Map baseline = new LinkedHashMap<>();
+ List lines;
+ try {
+ lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read baseline perf file: ", e);
+ }
+ for (int i = 0; i < lines.size(); i++) {
+ if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+ continue;
+ }
+ NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+ NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+ baseline.put(configuration, perf);
+ }
+ NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+ baselineFilename);
+ return baseline;
+ }
+
+ private static final String LINE =
+ "==========================================================================================";
+
+ /**
+ * Print summary of {@code actual} vs (if non-null) {@code baseline}.
+ *
+ * @throws IOException
+ */
+ private static void saveSummary(
+ @Nullable String summaryFilename,
+ Iterable configurations, Map actual,
+ @Nullable Map baseline, Instant start) {
+ List lines = new ArrayList<>();
+
+ lines.add("");
+ lines.add(LINE);
+
+ lines.add(
+ String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("");
+
+ lines.add("Default configuration:");
+ lines.add(NexmarkConfiguration.DEFAULT.toString());
+ lines.add("");
+
+ lines.add("Configurations:");
+ lines.add(" Conf Description");
+ int conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null && actualPerf.jobId != null) {
+ lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
+ }
+ }
+
+ lines.add("");
+ lines.add("Performance:");
+ lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
+ "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+ conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ String line = String.format(" %04d ", conf++);
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf == null) {
+ line += "*** not run ***";
+ } else {
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ double runtimeSec = actualPerf.runtimeSec;
+ line += String.format("%12.1f ", runtimeSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineRuntimeSec = baselinePerf.runtimeSec;
+ double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ double eventsPerSec = actualPerf.eventsPerSec;
+ line += String.format("%12.1f ", eventsPerSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineEventsPerSec = baselinePerf.eventsPerSec;
+ double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ long numResults = actualPerf.numResults;
+ line += String.format("%12d ", numResults);
+ if (baselinePerf == null) {
+ line += String.format("%12s", "");
+ } else {
+ long baselineNumResults = baselinePerf.numResults;
+ long diff = numResults - baselineNumResults;
+ line += String.format("%+12d", diff);
+ }
+ }
+ lines.add(line);
+
+ if (actualPerf != null) {
+ List errors = actualPerf.errors;
+ if (errors == null) {
+ errors = new ArrayList();
+ errors.add("NexmarkGoogleRunner returned null errors list");
+ }
+ for (String error : errors) {
+ lines.add(String.format(" %4s *** %s ***", "", error));
+ }
+ }
+ }
+
+ lines.add(LINE);
+ lines.add("");
+
+ for (String line : lines) {
+ System.out.println(line);
+ }
+
+ if (summaryFilename != null) {
+ try {
+ Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save summary file: ", e);
+ }
+ NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+ }
+ }
+
+ /**
+ * Write all perf data and any baselines to a javascript file which can be used by
+ * graphing page etc.
+ */
+ private static void saveJavascript(
+ @Nullable String javascriptFilename,
+ Iterable configurations, Map actual,
+ @Nullable Map baseline, Instant start) {
+ if (javascriptFilename == null) {
+ return;
+ }
+
+ List lines = new ArrayList<>();
+ lines.add(String.format(
+ "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("var all = [");
+
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(" {");
+ lines.add(String.format(" config: %s", configuration));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null) {
+ lines.add(String.format(" ,perf: %s", actualPerf));
+ }
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ if (baselinePerf != null) {
+ lines.add(String.format(" ,baseline: %s", baselinePerf));
+ }
+ lines.add(" },");
+ }
+
+ lines.add("];");
+
+ try {
+ Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save javascript file: ", e);
+ }
+ NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleDriver.java
new file mode 100644
index 000000000000..322ed2eff84d
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleDriver.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are 11 queries over a three table schema representing on online auction system:
+ *
+ * - {@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ *
- {@link Auction} represents an item under auction.
+ *
- {@link Bid} represents a bid for an item under auction.
+ *
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ *
We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ *
See
+ *
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/
+ */
+class NexmarkGoogleDriver extends NexmarkDriver {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkGoogleOptions extends Options, DataflowPipelineOptions {
+ @Description("If set, cancel running pipelines after this long")
+ @Nullable
+ Long getRunningTimeMinutes();
+
+ void setRunningTimeMinutes(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the system watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxSystemLagSeconds();
+
+ void setMaxSystemLagSeconds(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the data watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxDataLagSeconds();
+
+ void setMaxDataLagSeconds(Long value);
+
+ @Description("Only start validating watermarks after this many seconds")
+ @Nullable
+ Long getWatermarkValidationDelaySeconds();
+
+ void setWatermarkValidationDelaySeconds(Long value);
+ }
+
+ /**
+ * Entry point.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ // Gather command line args, baseline, configurations, etc.
+ NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkGoogleOptions.class);
+ NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
+ new NexmarkGoogleDriver().runAll(options, runner);
+ }
+
+ /**
+ * Append the pair of {@code configuration} and {@code perf} to perf file.
+ *
+ * @throws IOException
+ */
+ private static void appendPerf(
+ @Nullable String perfFilename, NexmarkConfiguration configuration,
+ NexmarkPerf perf) throws IOException {
+ if (perfFilename == null) {
+ return;
+ }
+ List lines = new ArrayList<>();
+ lines.add("");
+ lines.add(String.format("# %s", Instant.now()));
+ lines.add(String.format("# %s", configuration.toShortString()));
+ lines.add(configuration.toString());
+ lines.add(perf.toString());
+ Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+ StandardOpenOption.APPEND);
+ NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+ }
+
+ /**
+ * Load the baseline perf.
+ *
+ * @throws IOException
+ */
+ @Nullable
+ private static Map loadBaseline(
+ @Nullable String baselineFilename) throws IOException {
+ if (baselineFilename == null) {
+ return null;
+ }
+ Map baseline = new LinkedHashMap<>();
+ List lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+ for (int i = 0; i < lines.size(); i++) {
+ if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+ continue;
+ }
+ NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+ NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+ baseline.put(configuration, perf);
+ }
+ NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+ baselineFilename);
+ return baseline;
+ }
+
+ private static final String LINE =
+ "==========================================================================================";
+
+ /**
+ * Print summary of {@code actual} vs (if non-null) {@code baseline}.
+ *
+ * @throws IOException
+ */
+ private static void saveSummary(
+ @Nullable String summaryFilename,
+ Iterable configurations, Map actual,
+ @Nullable Map baseline, Instant start) throws IOException {
+ List lines = new ArrayList<>();
+
+ lines.add("");
+ lines.add(LINE);
+
+ lines.add(
+ String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("");
+
+ lines.add("Default configuration:");
+ lines.add(NexmarkConfiguration.DEFAULT.toString());
+ lines.add("");
+
+ lines.add("Configurations:");
+ lines.add(" Conf Description");
+ int conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null && actualPerf.jobId != null) {
+ lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
+ }
+ }
+
+ lines.add("");
+ lines.add("Performance:");
+ lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
+ "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+ conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ String line = String.format(" %04d ", conf++);
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf == null) {
+ line += "*** not run ***";
+ } else {
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ double runtimeSec = actualPerf.runtimeSec;
+ line += String.format("%12.1f ", runtimeSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineRuntimeSec = baselinePerf.runtimeSec;
+ double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ double eventsPerSec = actualPerf.eventsPerSec;
+ line += String.format("%12.1f ", eventsPerSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineEventsPerSec = baselinePerf.eventsPerSec;
+ double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ long numResults = actualPerf.numResults;
+ line += String.format("%12d ", numResults);
+ if (baselinePerf == null) {
+ line += String.format("%12s", "");
+ } else {
+ long baselineNumResults = baselinePerf.numResults;
+ long diff = numResults - baselineNumResults;
+ line += String.format("%+12d", diff);
+ }
+ }
+ lines.add(line);
+
+ if (actualPerf != null) {
+ List errors = actualPerf.errors;
+ if (errors == null) {
+ errors = new ArrayList();
+ errors.add("NexmarkGoogleRunner returned null errors list");
+ }
+ for (String error : errors) {
+ lines.add(String.format(" %4s *** %s ***", "", error));
+ }
+ }
+ }
+
+ lines.add(LINE);
+ lines.add("");
+
+ for (String line : lines) {
+ System.out.println(line);
+ }
+
+ if (summaryFilename != null) {
+ Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+ NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+ }
+ }
+
+ /**
+ * Write all perf data and any baselines to a javascript file which can be used by
+ * graphing page etc.
+ */
+ private static void saveJavascript(
+ @Nullable String javascriptFilename,
+ Iterable configurations, Map actual,
+ @Nullable Map baseline, Instant start) throws IOException {
+ if (javascriptFilename == null) {
+ return;
+ }
+
+ List lines = new ArrayList<>();
+ lines.add(String.format(
+ "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("var all = [");
+
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(" {");
+ lines.add(String.format(" config: %s", configuration));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null) {
+ lines.add(String.format(" ,perf: %s", actualPerf));
+ }
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ if (baselinePerf != null) {
+ lines.add(String.format(" ,baseline: %s", baselinePerf));
+ }
+ lines.add(" },");
+ }
+
+ lines.add("];");
+
+ Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+ NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+ }
+}
+
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleRunner.java
new file mode 100644
index 000000000000..79883e0773ff
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkGoogleRunner.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Run a singe Nexmark query using a given configuration on Google Dataflow.
+ */
+class NexmarkGoogleRunner extends NexmarkRunner {
+ /**
+ * How long to let streaming pipeline run after all events have been generated and we've
+ * seen no activity.
+ */
+ private static final Duration DONE_DELAY = Duration.standardMinutes(1);
+
+ /**
+ * How long to allow no activity without warning.
+ */
+ private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
+
+ /**
+ * How long to let streaming pipeline run after we've
+ * seen no activity, even if all events have not been generated.
+ */
+ private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
+
+ /**
+ * Delay between perf samples.
+ */
+ private static final Duration PERF_DELAY = Duration.standardSeconds(15);
+
+ /**
+ * Minimum number of samples needed for 'stead-state' rate calculation.
+ */
+ private static final int MIN_SAMPLES = 9;
+
+ /**
+ * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
+ */
+ private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+
+ public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
+ super(options);
+ }
+
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ String machineType = options.getWorkerMachineType();
+ if (machineType == null || machineType.isEmpty()) {
+ return 1;
+ }
+ String[] split = machineType.split("-");
+ if (split.length != 3) {
+ return 1;
+ }
+ try {
+ return Integer.parseInt(split[2]);
+ } catch (NumberFormatException ex) {
+ return 1;
+ }
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
+ }
+
+ @Override
+ protected boolean isBlocking() {
+ return options.getRunner() == BlockingDataflowPipelineRunner.class;
+ }
+
+ @Override
+ protected boolean canMonitor() {
+ return options.getRunner() == DataflowPipelineRunner.class;
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+ String jobName = options.getJobName();
+ String appName = options.getAppName();
+ options.setJobName("p-" + jobName);
+ options.setAppName("p-" + appName);
+ int coresPerWorker = coresPerWorker();
+ int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
+ / coresPerWorker;
+ options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
+ options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
+ publisherMonitor = new Monitor(queryName, "publisher");
+ try {
+ builder.build(options);
+ } finally {
+ options.setJobName(jobName);
+ options.setAppName(appName);
+ options.setMaxNumWorkers(options.getMaxNumWorkers());
+ options.setNumWorkers(options.getNumWorkers());
+ }
+ }
+
+ /**
+ * Monitor the progress of the publisher job. Return when it has been generating events for
+ * at least {@code configuration.preloadSeconds}.
+ */
+ @Override
+ protected void waitForPublisherPreload() {
+ Preconditions.checkNotNull(publisherMonitor);
+ Preconditions.checkNotNull(publisherResult);
+ if (!options.getMonitorJobs()) {
+ return;
+ }
+ if (options.getRunner() != DataflowPipelineRunner.class) {
+ return;
+ }
+ if (!(publisherResult instanceof DataflowPipelineJob)) {
+ return;
+ }
+ if (configuration.preloadSeconds <= 0) {
+ return;
+ }
+
+ NexmarkUtils.console("waiting for publisher to pre-load");
+
+ DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
+
+ long numEvents = 0;
+ long startMsSinceEpoch = -1;
+ long endMsSinceEpoch = -1;
+ while (true) {
+ PipelineResult.State state = job.getState();
+ switch (state) {
+ case UNKNOWN:
+ // Keep waiting.
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ break;
+ case STOPPED:
+ case DONE:
+ case CANCELLED:
+ case FAILED:
+ case UPDATED:
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ return;
+ case RUNNING:
+ numEvents = getLong(job, publisherMonitor.getElementCounter());
+ if (startMsSinceEpoch < 0 && numEvents > 0) {
+ startMsSinceEpoch = System.currentTimeMillis();
+ endMsSinceEpoch = startMsSinceEpoch
+ + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+ }
+ if (endMsSinceEpoch < 0) {
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ } else {
+ long remainMs = endMsSinceEpoch - System.currentTimeMillis();
+ if (remainMs > 0) {
+ NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
+ remainMs / 1000);
+ } else {
+ NexmarkUtils.console("publisher preloaded %d events", numEvents);
+ return;
+ }
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(PERF_DELAY.getMillis());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException("Interrupted: publisher still running.");
+ }
+ }
+ }
+
+ /**
+ * Monitor the performance and progress of a running job. Return final performance if
+ * it was measured.
+ */
+ @Override
+ @Nullable
+ protected NexmarkPerf monitor(NexmarkQuery query) {
+ if (!options.getMonitorJobs()) {
+ return null;
+ }
+ if (options.getRunner() != DataflowPipelineRunner.class) {
+ return null;
+ }
+ if (!(mainResult instanceof DataflowPipelineJob)) {
+ return null;
+ }
+
+ // If we are not in debug mode, we have no event count or result count monitors.
+ boolean monitorsActive = configuration.debug;
+
+ if (monitorsActive) {
+ NexmarkUtils.console("Waiting for main pipeline to 'finish'");
+ } else {
+ NexmarkUtils.console("--debug=false, so job will not self-cancel");
+ }
+
+ DataflowPipelineJob job = (DataflowPipelineJob) mainResult;
+ DataflowPipelineJob publisherJob = (DataflowPipelineJob) publisherResult;
+ List snapshots = new ArrayList<>();
+ long startMsSinceEpoch = System.currentTimeMillis();
+ long endMsSinceEpoch = -1;
+ if (options.getRunningTimeMinutes() != null) {
+ endMsSinceEpoch = startMsSinceEpoch
+ + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
+ - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+ }
+ long lastActivityMsSinceEpoch = -1;
+ NexmarkPerf perf = null;
+ boolean waitingForShutdown = false;
+ boolean publisherCancelled = false;
+ List errors = new ArrayList<>();
+
+ while (true) {
+ long now = System.currentTimeMillis();
+ if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
+ NexmarkUtils.console("Reached end of test, cancelling job");
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ if (publisherResult != null) {
+ try {
+ publisherJob.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ }
+ publisherCancelled = true;
+ }
+ waitingForShutdown = true;
+ }
+
+ PipelineResult.State state = job.getState();
+ NexmarkUtils.console("%s %s%s", state, queryName,
+ waitingForShutdown ? " (waiting for shutdown)" : "");
+
+ NexmarkPerf currPerf;
+ if (monitorsActive) {
+ currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots, query.eventMonitor,
+ query.resultMonitor);
+ } else {
+ currPerf = null;
+ }
+
+ if (perf == null || perf.anyActivity(currPerf)) {
+ lastActivityMsSinceEpoch = now;
+ }
+
+ if (options.isStreaming() && !waitingForShutdown) {
+ Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
+ if (query.getFatalCount() != null && getLong(job, query.getFatalCount()) > 0) {
+ NexmarkUtils.console("job has fatal errors, cancelling.");
+ errors.add(String.format("Pipeline reported %s fatal errors", query.getFatalCount()));
+ waitingForShutdown = true;
+ } else if (monitorsActive && configuration.numEvents > 0
+ && currPerf.numEvents == configuration.numEvents
+ && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
+ errors.add("Streaming job was cancelled since appeared stuck");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
+ NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
+ quietFor.getStandardMinutes());
+ errors.add(
+ String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
+ }
+
+ errors.addAll(checkWatermarks(job, startMsSinceEpoch));
+
+ if (waitingForShutdown) {
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ }
+ }
+
+ perf = currPerf;
+
+ boolean running = true;
+ switch (state) {
+ case UNKNOWN:
+ case STOPPED:
+ case RUNNING:
+ // Keep going.
+ break;
+ case DONE:
+ // All done.
+ running = false;
+ break;
+ case CANCELLED:
+ running = false;
+ if (!waitingForShutdown) {
+ errors.add("Job was unexpectedly cancelled");
+ }
+ break;
+ case FAILED:
+ case UPDATED:
+ // Abnormal termination.
+ running = false;
+ errors.add("Job was unexpectedly updated");
+ break;
+ }
+
+ if (!running) {
+ break;
+ }
+
+ if (lastActivityMsSinceEpoch == now) {
+ NexmarkUtils.console("new perf %s", perf);
+ } else {
+ NexmarkUtils.console("no activity");
+ }
+
+ try {
+ Thread.sleep(PERF_DELAY.getMillis());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ NexmarkUtils.console("Interrupted: pipeline is still running");
+ }
+ }
+
+ perf.errors = errors;
+ perf.snapshots = snapshots;
+
+ if (publisherResult != null) {
+ NexmarkUtils.console("Shutting down publisher pipeline.");
+ try {
+ if (!publisherCancelled) {
+ publisherJob.cancel();
+ }
+ publisherJob.waitToFinish(5, TimeUnit.MINUTES, null);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException("Interrupted: publish job still running.", e);
+ }
+ }
+
+ return perf;
+ }
+
+ enum MetricType {
+ SYSTEM_WATERMARK,
+ DATA_WATERMARK,
+ OTHER
+ }
+
+ private MetricType getMetricType(MetricUpdate metric) {
+ String metricName = metric.getName().getName();
+ if (metricName.endsWith("windmill-system-watermark")) {
+ return MetricType.SYSTEM_WATERMARK;
+ } else if (metricName.endsWith("windmill-data-watermark")) {
+ return MetricType.DATA_WATERMARK;
+ } else {
+ return MetricType.OTHER;
+ }
+ }
+
+ /**
+ * Check that watermarks are not too far behind.
+ *
+ *
Returns a list of errors detected.
+ */
+ private List checkWatermarks(DataflowPipelineJob job, long startMsSinceEpoch) {
+ long now = System.currentTimeMillis();
+ List errors = new ArrayList<>();
+ try {
+ JobMetrics metricResponse = job.getDataflowClient()
+ .projects()
+ .jobs()
+ .getMetrics(job.getProjectId(), job.getJobId())
+ .execute();
+ List metrics = metricResponse.getMetrics();
+ if (metrics != null) {
+ boolean foundWatermarks = false;
+ for (MetricUpdate metric : metrics) {
+ MetricType type = getMetricType(metric);
+ if (type == MetricType.OTHER) {
+ continue;
+ }
+ foundWatermarks = true;
+ @SuppressWarnings("unchecked")
+ BigDecimal scalar = (BigDecimal) metric.getScalar();
+ if (scalar.signum() < 0) {
+ continue;
+ }
+ Instant value =
+ new Instant(scalar.divideToIntegralValue(new BigDecimal(1000)).longValueExact());
+ Instant updateTime = Instant.parse(metric.getUpdateTime());
+
+ if (options.getWatermarkValidationDelaySeconds() == null
+ || now > startMsSinceEpoch
+ + Duration.standardSeconds(options.getWatermarkValidationDelaySeconds())
+ .getMillis()) {
+ Duration threshold = null;
+ if (type == MetricType.SYSTEM_WATERMARK && options.getMaxSystemLagSeconds() != null) {
+ threshold = Duration.standardSeconds(options.getMaxSystemLagSeconds());
+ } else if (type == MetricType.DATA_WATERMARK
+ && options.getMaxDataLagSeconds() != null) {
+ threshold = Duration.standardSeconds(options.getMaxDataLagSeconds());
+ }
+
+ if (threshold != null && value.isBefore(updateTime.minus(threshold))) {
+ String msg = String.format("High lag for %s: %s vs %s (allowed lag of %s)",
+ metric.getName().getName(), value, updateTime, threshold);
+ errors.add(msg);
+ NexmarkUtils.console(msg);
+ }
+ }
+ }
+ if (!foundWatermarks) {
+ NexmarkUtils.console("No known watermarks in update: " + metrics);
+ if (now > startMsSinceEpoch + Duration.standardMinutes(5).getMillis()) {
+ errors.add("No known watermarks found. Metrics were " + metrics);
+ }
+ }
+ }
+ } catch (IOException e) {
+ NexmarkUtils.console("Warning: failed to get JobMetrics: " + e);
+ }
+
+ return errors;
+ }
+
+ /**
+ * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
+ */
+ private NexmarkPerf currentPerf(
+ long startMsSinceEpoch, long now, DataflowPipelineJob job,
+ List snapshots, Monitor> eventMonitor,
+ Monitor> resultMonitor) {
+ NexmarkPerf perf = new NexmarkPerf();
+
+ long numEvents = getLong(job, eventMonitor.getElementCounter());
+ long numEventBytes = getLong(job, eventMonitor.getBytesCounter());
+ long eventStart = getTimestamp(now, job, eventMonitor.getStartTime());
+ long eventEnd = getTimestamp(now, job, eventMonitor.getEndTime());
+ long numResults = getLong(job, resultMonitor.getElementCounter());
+ long numResultBytes = getLong(job, resultMonitor.getBytesCounter());
+ long resultStart = getTimestamp(now, job, resultMonitor.getStartTime());
+ long resultEnd = getTimestamp(now, job, resultMonitor.getEndTime());
+ long timestampStart = getTimestamp(now, job, resultMonitor.getStartTimestamp());
+ long timestampEnd = getTimestamp(now, job, resultMonitor.getEndTimestamp());
+
+ long effectiveEnd = -1;
+ if (eventEnd >= 0 && resultEnd >= 0) {
+ // It is possible for events to be generated after the last result was emitted.
+ // (Eg Query 2, which only yields results for a small prefix of the event stream.)
+ // So use the max of last event and last result times.
+ effectiveEnd = Math.max(eventEnd, resultEnd);
+ } else if (resultEnd >= 0) {
+ effectiveEnd = resultEnd;
+ } else if (eventEnd >= 0) {
+ // During startup we may have no result yet, but we would still like to track how
+ // long the pipeline has been running.
+ effectiveEnd = eventEnd;
+ }
+
+ if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+ perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
+ }
+
+ if (numEvents >= 0) {
+ perf.numEvents = numEvents;
+ }
+
+ if (numEvents >= 0 && perf.runtimeSec > 0.0) {
+ // For streaming we may later replace this with a 'steady-state' value calculated
+ // from the progress snapshots.
+ perf.eventsPerSec = numEvents / perf.runtimeSec;
+ }
+
+ if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
+ }
+
+ if (numResults >= 0) {
+ perf.numResults = numResults;
+ }
+
+ if (numResults >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultsPerSec = numResults / perf.runtimeSec;
+ }
+
+ if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
+ }
+
+ if (eventStart >= 0) {
+ perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
+ }
+
+ if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+ perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
+ }
+
+ if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
+ double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
+ perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
+ }
+
+ if (resultEnd >= 0) {
+ // Fill in the shutdown delay assuming the job has now finished.
+ perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
+ }
+
+ perf.jobId = job.getJobId();
+ // As soon as available, try to capture cumulative cost at this point too.
+
+ NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
+ snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
+ snapshot.runtimeSec = perf.runtimeSec;
+ snapshot.numEvents = numEvents;
+ snapshot.numResults = numResults;
+ snapshots.add(snapshot);
+
+ captureSteadyState(perf, snapshots);
+
+ return perf;
+ }
+
+ /**
+ * Find a 'steady state' events/sec from {@code snapshots} and
+ * store it in {@code perf} if found.
+ */
+ private void captureSteadyState(NexmarkPerf perf, List snapshots) {
+ if (!options.isStreaming()) {
+ return;
+ }
+
+ // Find the first sample with actual event and result counts.
+ int dataStart = 0;
+ for (; dataStart < snapshots.size(); dataStart++) {
+ if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
+ break;
+ }
+ }
+
+ // Find the last sample which demonstrated progress.
+ int dataEnd = snapshots.size() - 1;
+ for (; dataEnd > dataStart; dataEnd--) {
+ if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
+ break;
+ }
+ }
+
+ int numSamples = dataEnd - dataStart + 1;
+ if (numSamples < MIN_SAMPLES) {
+ // Not enough samples.
+ NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
+ numSamples);
+ return;
+ }
+
+ // We'll look at only the middle third samples.
+ int sampleStart = dataStart + numSamples / 3;
+ int sampleEnd = dataEnd - numSamples / 3;
+
+ double sampleSec =
+ snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
+ if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
+ // Not sampled over enough time.
+ NexmarkUtils.console(
+ "sample of %.1f sec not long enough to calculate steady-state event rate",
+ sampleSec);
+ return;
+ }
+
+ // Find rate with least squares error.
+ double sumxx = 0.0;
+ double sumxy = 0.0;
+ long prevNumEvents = -1;
+ for (int i = sampleStart; i <= sampleEnd; i++) {
+ if (prevNumEvents == snapshots.get(i).numEvents) {
+ // Skip samples with no change in number of events since they contribute no data.
+ continue;
+ }
+ // Use the effective runtime instead of wallclock time so we can
+ // insulate ourselves from delays and stutters in the query manager.
+ double x = snapshots.get(i).runtimeSec;
+ prevNumEvents = snapshots.get(i).numEvents;
+ double y = prevNumEvents;
+ sumxx += x * x;
+ sumxy += x * y;
+ }
+ double eventsPerSec = sumxy / sumxx;
+ NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
+ perf.eventsPerSec = eventsPerSec;
+ }
+
+ /**
+ * Return the current value for a long counter, or -1 if can't be retrieved.
+ */
+ private long getLong(DataflowPipelineJob job, Aggregator aggregator) {
+ try {
+ Collection values = job.getAggregatorValues(aggregator).getValues();
+ if (values.size() != 1) {
+ return -1;
+ }
+ return Iterables.getOnlyElement(values);
+ } catch (AggregatorRetrievalException e) {
+ return -1;
+ }
+ }
+
+ /**
+ * Return the current value for a time counter, or -1 if can't be retrieved.
+ */
+ private long getTimestamp(
+ long now, DataflowPipelineJob job, Aggregator aggregator) {
+ try {
+ Collection values = job.getAggregatorValues(aggregator).getValues();
+ if (values.size() != 1) {
+ return -1;
+ }
+ long value = Iterables.getOnlyElement(values);
+ if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+ return -1;
+ }
+ return value;
+ } catch (AggregatorRetrievalException e) {
+ return -1;
+ }
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkPerf.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkPerf.java
new file mode 100644
index 000000000000..4581a58cf85d
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkPerf.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Summary of performance for a particular run of a configuration.
+ */
+class NexmarkPerf {
+ /**
+ * A sample of the number of events and number of results (if known) generated at
+ * a particular time.
+ */
+ public static class ProgressSnapshot {
+ /** Seconds since job was started (in wallclock time). */
+ @JsonProperty
+ double secSinceStart;
+
+ /** Job runtime in seconds (time from first event to last generated event or output result). */
+ @JsonProperty
+ double runtimeSec;
+
+ /** Cumulative number of events generated. -1 if not known. */
+ @JsonProperty
+ long numEvents;
+
+ /** Cumulative number of results emitted. -1 if not known. */
+ @JsonProperty
+ long numResults;
+
+ /**
+ * Return true if there looks to be activity between {@code this} and {@code that}
+ * snapshots.
+ */
+ public boolean anyActivity(ProgressSnapshot that) {
+ if (runtimeSec != that.runtimeSec) {
+ // An event or result end timestamp looks to have changed.
+ return true;
+ }
+ if (numEvents != that.numEvents) {
+ // Some more events were generated.
+ return true;
+ }
+ if (numResults != that.numResults) {
+ // Some more results were emitted.
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Progess snapshots. Null if not yet calculated.
+ */
+ @JsonProperty
+ @Nullable
+ public List snapshots = null;
+
+ /**
+ * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of
+ * timestamp of last generated event and last emitted result. -1 if not known.
+ */
+ @JsonProperty
+ public double runtimeSec = -1.0;
+
+ /**
+ * Number of events generated. -1 if not known.
+ */
+ @JsonProperty
+ public long numEvents = -1;
+
+ /**
+ * Number of events generated per second of runtime. For batch this is number of events
+ * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled
+ * over the lifetime of the job. -1 if not known.
+ */
+ @JsonProperty
+ public double eventsPerSec = -1.0;
+
+ /**
+ * Number of event bytes generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double eventBytesPerSec = -1.0;
+
+ /**
+ * Number of results emitted. -1 if not known.
+ */
+ @JsonProperty
+ public long numResults = -1;
+
+ /**
+ * Number of results generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double resultsPerSec = -1.0;
+
+ /**
+ * Number of result bytes generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double resultBytesPerSec = -1.0;
+
+ /**
+ * Delay between start of job and first event in second. -1 if not known.
+ */
+ @JsonProperty
+ public double startupDelaySec = -1.0;
+
+ /**
+ * Delay between first event and first result in seconds. -1 if not known.
+ */
+ @JsonProperty
+ public double processingDelaySec = -1.0;
+
+ /**
+ * Delay between last result and job completion in seconds. -1 if not known.
+ */
+ @JsonProperty
+ public double shutdownDelaySec = -1.0;
+
+ /**
+ * Time-dilation factor. Calculate as event time advancement rate relative to real time.
+ * Greater than one implies we processed events faster than they would have been generated
+ * in real time. Less than one implies we could not keep up with events in real time.
+ * -1 if not known.
+ */
+ @JsonProperty
+ double timeDilation = -1.0;
+
+ /**
+ * List of errors encountered during job execution.
+ */
+ @JsonProperty
+ @Nullable
+ public List errors = null;
+
+ /**
+ * The job id this perf was drawn from. Null if not known.
+ */
+ @JsonProperty
+ @Nullable
+ public String jobId = null;
+
+ /**
+ * Return a JSON representation of performance.
+ */
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse a {@link NexmarkPerf} object from JSON {@code string}.
+ *
+ * @throws IOException
+ */
+ public static NexmarkPerf fromString(String string) {
+ try {
+ return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse nexmark perf: ", e);
+ }
+ }
+
+ /**
+ * Return true if there looks to be activity between {@code this} and {@code that}
+ * perf values.
+ */
+ public boolean anyActivity(NexmarkPerf that) {
+ if (runtimeSec != that.runtimeSec) {
+ // An event or result end timestamp looks to have changed.
+ return true;
+ }
+ if (numEvents != that.numEvents) {
+ // Some more events were generated.
+ return true;
+ }
+ if (numResults != that.numResults) {
+ // Some more results were emitted.
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQuery.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQuery.java
new file mode 100644
index 000000000000..44ba2d4ef8cc
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQuery.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
+ * multiple queries.
+ */
+public abstract class NexmarkQuery
+ extends PTransform, PCollection>> {
+ protected static final TupleTag AUCTION_TAG = new TupleTag<>("auctions");
+ protected static final TupleTag BID_TAG = new TupleTag<>("bids");
+ protected static final TupleTag PERSON_TAG = new TupleTag<>("person");
+
+ /** Predicate to detect a new person event. */
+ protected static final SerializableFunction IS_NEW_PERSON =
+ new SerializableFunction() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.newPerson != null;
+ }
+ };
+
+ /** DoFn to convert a new person event to a person. */
+ protected static final DoFn AS_PERSON = new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().newPerson);
+ }
+ };
+
+ /** Predicate to detect a new auction event. */
+ protected static final SerializableFunction IS_NEW_AUCTION =
+ new SerializableFunction() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.newAuction != null;
+ }
+ };
+
+ /** DoFn to convert a new auction event to an auction. */
+ protected static final DoFn AS_AUCTION = new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().newAuction);
+ }
+ };
+
+ /** Predicate to detect a new bid event. */
+ protected static final SerializableFunction IS_BID =
+ new SerializableFunction() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.bid != null;
+ }
+ };
+
+ /** DoFn to convert a bid event to a bid. */
+ protected static final DoFn AS_BID = new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().bid);
+ }
+ };
+
+ /** Transform to key each person by their id. */
+ protected static final ParDo.Bound> PERSON_BY_ID =
+ ParDo.named("PersonById")
+ .of(new DoFn>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().id, c.element()));
+ }
+ });
+
+ /** Transform to key each auction by its id. */
+ protected static final ParDo.Bound> AUCTION_BY_ID =
+ ParDo.named("AuctionById")
+ .of(new DoFn>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().id, c.element()));
+ }
+ });
+
+ /** Transform to key each auction by its seller id. */
+ protected static final ParDo.Bound> AUCTION_BY_SELLER =
+ ParDo.named("AuctionBySeller")
+ .of(new DoFn>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().seller, c.element()));
+ }
+ });
+
+ /** Transform to key each bid by it's auction id. */
+ protected static final ParDo.Bound> BID_BY_AUCTION =
+ ParDo.named("BidByAuction")
+ .of(new DoFn>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().auction, c.element()));
+ }
+ });
+
+ /** Transform to project the auction id from each bid.*/
+ protected static final ParDo.Bound BID_TO_AUCTION =
+ ParDo.named("BidToAuction")
+ .of(new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().auction);
+ }
+ });
+
+ /** Transform to project the price from each bid. */
+ protected static final ParDo.Bound BID_TO_PRICE =
+ ParDo.named("BidToPrice")
+ .of(new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().price);
+ }
+ });
+
+ /** Transform to emit each event with the timestamp embedded within it. */
+ public static final ParDo.Bound EVENT_TIMESTAMP_FROM_DATA =
+ ParDo.named("OutputWithTimestamp")
+ .of(new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) {
+ Event e = c.element();
+ if (e.bid != null) {
+ c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+ } else if (e.newPerson != null) {
+ c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+ } else if (e.newAuction != null) {
+ c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+ }
+ }
+ });
+
+ /**
+ * Transform to filter for just the new auction events.
+ */
+ protected static final PTransform, PCollection> JUST_NEW_AUCTIONS =
+ new PTransform, PCollection>("justNewAuctions") {
+ @Override
+ public PCollection apply(PCollection input) {
+ return input.apply(Filter.byPredicate(IS_NEW_AUCTION).named("IsAuction"))
+ .apply(ParDo.named("AsAuction").of(AS_AUCTION));
+ }
+ };
+
+ /**
+ * Transform to filter for just the new person events.
+ */
+ protected static final PTransform, PCollection> JUST_NEW_PERSONS =
+ new PTransform, PCollection>("justNewPersons") {
+ @Override
+ public PCollection apply(PCollection input) {
+ return input.apply(Filter.byPredicate(IS_NEW_PERSON).named("IsPerson"))
+ .apply(ParDo.named("AsPerson").of(AS_PERSON));
+ }
+ };
+
+ /**
+ * Transform to filter for just the bid events.
+ */
+ protected static final PTransform, PCollection> JUST_BIDS =
+ new PTransform, PCollection>("justBids") {
+ @Override
+ public PCollection apply(PCollection input) {
+ return input.apply(Filter.byPredicate(IS_BID).named("IsBid"))
+ .apply(ParDo.named("AsBid").of(AS_BID));
+ }
+ };
+
+ protected final NexmarkConfiguration configuration;
+ public final Monitor eventMonitor;
+ public final Monitor resultMonitor;
+ public final Monitor endOfStreamMonitor;
+
+ protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+ super(name);
+ this.configuration = configuration;
+ if (configuration.debug) {
+ eventMonitor = new Monitor(name + ".Events", "event");
+ resultMonitor = new Monitor(name + ".Results", "result");
+ endOfStreamMonitor = new Monitor(name + ".EndOfStream", "end");
+ } else {
+ eventMonitor = null;
+ resultMonitor = null;
+ endOfStreamMonitor = null;
+ }
+ }
+
+ /**
+ * Return the aggregator which counts fatal errors in this query. Return null if no such
+ * aggregator.
+ */
+ @Nullable
+ public Aggregator getFatalCount() {
+ return null;
+ }
+
+ /**
+ * Implement the actual query. All we know about the result is it has a known encoded size.
+ */
+ protected abstract PCollection applyPrim(PCollection events);
+
+ @Override
+ public PCollection> apply(PCollection events) {
+
+ if (configuration.debug) {
+ events =
+ events
+ // Monitor events as they go by.
+ .apply(eventMonitor.getTransform())
+ // Count each type of event.
+ .apply(NexmarkUtils.snoop(name));
+ }
+
+ if (configuration.cpuDelayMs > 0) {
+ // Slow down by pegging one core at 100%.
+ events = events.apply(NexmarkUtils.cpuDelay(name, configuration.cpuDelayMs));
+ }
+
+ if (configuration.diskBusyBytes > 0) {
+ // Slow down by forcing bytes to durable store.
+ events = events.apply(NexmarkUtils.diskBusy(name, configuration.diskBusyBytes));
+ }
+
+ // Run the query.
+ PCollection queryResults = applyPrim(events);
+
+ if (configuration.debug) {
+ // Monitor results as they go by.
+ queryResults = queryResults.apply(resultMonitor.getTransform());
+ }
+ // Timestamp the query results.
+ return queryResults.apply(NexmarkUtils.stamp(name));
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQueryModel.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQueryModel.java
new file mode 100644
index 000000000000..c85e24fe5320
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkQueryModel.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base class for models of the eight NEXMark queries. Provides an assertion
+ * function which can be applied against the actual query results to check their consistency
+ * with the model.
+ */
+public abstract class NexmarkQueryModel implements Serializable {
+ /**
+ * Return the start of the most recent window of {@code size} and {@code period} which ends
+ * strictly before {@code timestamp}.
+ */
+ public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+ long ts = timestamp.getMillis();
+ long p = period.getMillis();
+ long lim = ts - ts % p;
+ long s = size.getMillis();
+ return new Instant(lim - s);
+ }
+
+ protected final NexmarkConfiguration configuration;
+
+ public NexmarkQueryModel(NexmarkConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * Convert {@code itr} to strings capturing values, timestamps and order.
+ */
+ protected static List toValueTimestampOrder(Iterator> itr) {
+ List strings = new ArrayList<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().toString());
+ }
+ return strings;
+ }
+
+ /**
+ * Convert {@code itr} to strings capturing values and order.
+ */
+ protected static List toValueOrder(Iterator> itr) {
+ List strings = new ArrayList<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().getValue().toString());
+ }
+ return strings;
+ }
+
+ /**
+ * Convert {@code itr} to strings capturing values only.
+ */
+ protected static Set toValue(Iterator> itr) {
+ Set strings = new HashSet<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().getValue().toString());
+ }
+ return strings;
+ }
+
+ /** Return simulator for query. */
+ protected abstract AbstractSimulator, ?> simulator();
+
+ /** Return sub-sequence of results which are significant for model. */
+ protected Iterable> relevantResults(
+ Iterable> results) {
+ return results;
+ }
+
+ /**
+ * Convert iterator of elements to collection of strings to use when testing coherence
+ * of model against actual query results.
+ */
+ protected abstract Collection toCollection(Iterator> itr);
+
+ /**
+ * Return assertion to use on results of pipeline for this query.
+ */
+ public SerializableFunction>, Void> assertionFor() {
+ final Collection expectedStrings = toCollection(simulator().results());
+
+ return new SerializableFunction>, Void>() {
+ @Override
+ public Void apply(Iterable> actual) {
+ Collection actualStrings = toCollection(relevantResults(actual).iterator());
+ Assert.assertEquals(expectedStrings, actualStrings);
+ return null;
+ }
+ };
+ }
+}
diff --git a/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkRunner.java b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkRunner.java
new file mode 100644
index 000000000000..3b2a9d5331c9
--- /dev/null
+++ b/integration/java/src/main/java/com/google/cloud/dataflow/integration/nexmark/NexmarkRunner.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.integration.nexmark;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Run a single Nexmark query using a given configuration.
+ */
+public abstract class NexmarkRunner {
+ /**
+ * Options shared by all runs.
+ */
+ protected final OptionT options;
+
+ /**
+ * Which configuration we are running.
+ */
+ @Nullable
+ protected NexmarkConfiguration configuration;
+
+ /**
+ * Accumulate the pub/sub subscriptions etc which should be cleaned up on end of run.
+ */
+ @Nullable
+ protected PubsubHelper pubsub;
+
+ /**
+ * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ protected Monitor publisherMonitor;
+
+ /**
+ * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ protected PipelineResult publisherResult;
+
+ /**
+ * Result for the main pipeline.
+ */
+ @Nullable
+ protected PipelineResult mainResult;
+
+ /**
+ * Query name we are running.
+ */
+ @Nullable
+ protected String queryName;
+
+ public NexmarkRunner(OptionT options) {
+ this.options = options;
+ }
+
+ /**
+ * Return a Pubsub helper.
+ */
+ private PubsubHelper getPubsub() {
+ if (pubsub == null) {
+ pubsub = PubsubHelper.create(options);
+ }
+ return pubsub;
+ }
+
+ // ================================================================================
+ // Overridden by each runner.
+ // ================================================================================
+
+ /**
+ * Is this query running in streaming mode?
+ */
+ protected abstract boolean isStreaming();
+
+ /**
+ * Return number of cores per worker.
+ */
+ protected abstract int coresPerWorker();
+
+ /**
+ * Return maximum number of workers.
+ */
+ protected abstract int maxNumWorkers();
+
+ /**
+ * Return true if runner is blocking.
+ */
+ protected abstract boolean isBlocking();
+
+ /**
+ * Return true if runner can monitor running jobs.
+ */
+ protected abstract boolean canMonitor();
+
+ /**
+ * Build and run a pipeline using specified options.
+ */
+ protected interface PipelineBuilder {
+ void build(OptionT publishOnlyOptions);
+ }
+
+ /**
+ * Invoke the builder with options suitable for running a publish-only child pipeline.
+ */
+ protected abstract void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder);
+
+ /**
+ * If monitoring, wait until the publisher pipeline has run long enough to establish
+ * a backlog on the Pubsub topic. Otherwise, return immediately.
+ */
+ protected abstract void waitForPublisherPreload();
+
+ /**
+ * If monitoring, print stats on the main pipeline and return the final perf
+ * when it has run long enough. Otherwise, return null immediately.
+ */
+ protected abstract NexmarkPerf monitor(NexmarkQuery query);
+
+ // ================================================================================
+ // Basic sources and sinks
+ // ================================================================================
+
+ /**
+ * Return a source of synthetic events.
+ */
+ private PCollection sourceFromSynthetic(Pipeline p) {
+ if (isStreaming()) {
+ NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
+ return p.apply(NexmarkUtils.streamEventsSource(queryName, configuration));
+ } else {
+ NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
+ return p.apply(NexmarkUtils.batchEventsSource(queryName, configuration));
+ }
+ }
+
+ /**
+ * Return source of events from Pubsub.
+ */
+ private PCollection sourceEventsFromPubsub(Pipeline p, long now) {
+ String baseTopic = options.getPubsubTopic();
+ if (baseTopic == null || baseTopic.isEmpty()) {
+ throw new RuntimeException("Missing --pubsubTopic");
+ }
+ String shortTopic;
+ if (options.getUniqify()) {
+ // Salt the topic name so we can run multiple jobs in parallel.
+ shortTopic = String.format("%s_%d_%s_source", baseTopic, now, queryName);
+ } else {
+ shortTopic = String.format("%s_%s_source", baseTopic, queryName);
+ }
+ String shortSubscription = shortTopic;
+ // Create/confirm the subscription.
+ String subscription = null;
+ if (!options.getManageResources()) {
+ // The subscription should already have been created by the user.
+ subscription = getPubsub().reuseSubscription(shortTopic, shortSubscription);
+ } else {
+ subscription = getPubsub().createSubscription(shortTopic, shortSubscription);
+ }
+ NexmarkUtils.console("Reading events from Pubsub %s", subscription);
+ PubsubIO.Read.Bound io =
+ PubsubIO.Read.named(queryName + ".ReadPubsubEvents(" + subscription.replace('/', '.') + ")")
+ .subscription(subscription)
+ .idLabel(NexmarkUtils.PUBSUB_ID)
+ .withCoder(Event.CODER);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+ return p.apply(io);
+ }
+
+ /**
+ * Return Avro source of events from {@code options.getInputFilePrefix}.
+ */
+ private PCollection sourceFromAvro(Pipeline p) {
+ String filename = options.getInputFilePrefix();
+ if (filename == null || filename.isEmpty()) {
+ throw new RuntimeException("Missing --inputFilePrefix");
+ }
+ NexmarkUtils.console("Reading events from Avro files at %s", filename);
+ return p
+ .apply(AvroIO.Read.named(queryName + ".ReadAvroEvents(" + filename.replace('/', '.') + ")")
+ .from(filename + "*.avro")
+ .withSchema(Event.class))
+ .apply(NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+ }
+
+ /**
+ * Send {@code events} to Pubsub.
+ */
+ private void sinkEventsToPubsub(PCollection events, long now) {
+ String baseTopic = options.getPubsubTopic();
+ if (baseTopic == null || baseTopic.isEmpty()) {
+ throw new RuntimeException("Missing --pubsubTopic");
+ }
+ String shortTopic;
+ if (options.getUniqify()) {
+ // Salt the topic name so we can run multiple jobs in parallel.
+ shortTopic = String.format("%s_%d_%s_source", baseTopic, now, queryName);
+ } else {
+ shortTopic = String.format("%s_%s_source", baseTopic, queryName);
+ }
+ // Create/confirm the topic.
+ String topic;
+ if (!options.getManageResources()
+ || configuration.pubSubMode == NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY) {
+ // The topic should already have been created by the user or
+ // a companion 'PUBLISH_ONLY' process.
+ topic = getPubsub().reuseTopic(shortTopic);
+ } else {
+ // Create a fresh topic to loopback via. It will be destroyed when the
+ // (necessarily blocking) job is done.
+ topic = getPubsub().createTopic(shortTopic);
+ }
+ NexmarkUtils.console("Writing events to Pubsub %s", topic);
+ PubsubIO.Write.Bound io =
+ PubsubIO.Write.named(queryName + ".WritePubsubEvents(" + topic.replace('/', '.') + ")")
+ .topic(topic)
+ .idLabel(NexmarkUtils.PUBSUB_ID)
+ .withCoder(Event.CODER);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+ events.apply(io);
+ }
+
+ /**
+ * Send {@code formattedResults} to Pubsub.
+ */
+ private void sinkResultsToPubsub(PCollection formattedResults, long now) {
+ String baseTopic = options.getPubsubTopic();
+ if (baseTopic == null || baseTopic.isEmpty()) {
+ throw new RuntimeException("Missing --pubsubTopic");
+ }
+ String shortTopic;
+ if (options.getUniqify()) {
+ shortTopic = String.format("%s_%d_%s_sink", baseTopic, now, queryName);
+ } else {
+ shortTopic = String.format("%s_%s_sink", baseTopic, queryName);
+ }
+ String topic;
+ if (!options.getManageResources()) {
+ topic = getPubsub().reuseTopic(shortTopic);
+ } else {
+ topic = getPubsub().createTopic(shortTopic);
+ }
+ NexmarkUtils.console("Writing results to Pubsub %s", topic);
+ PubsubIO.Write.Bound io =
+ PubsubIO.Write.named(queryName + ".WritePubsubResults(" + topic.replace('/', '.') + ")")
+ .topic(topic)
+ .idLabel(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+ formattedResults.apply(io);
+ }
+
+ /**
+ * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
+ * This will configure the job to write the following files:
+ *
+ * - {@code $outputPath/event*.avro} All Event entities.
+ *
- {@code $outputPath/auction*.avro} Auction entities.
+ *
- {@code $outputPath/bid*.avro} Bid entities.
+ *
- {@code $outputPath/person*.avro} Person entities.
+ *
+ *
+ * @param source A PCollection of events.
+ */
+ private void sinkToAvro(PCollection source) {
+ String filename = options.getOutputPath();
+ if (filename == null || filename.isEmpty()) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ NexmarkUtils.console("Writing events to Avro files at %s", filename);
+ source.apply(AvroIO.Write.named(queryName + ".WriteAvroEvents(" + filename.replace('/', '.')
+ + ")")
+ .to(filename + "/event")
+ .withSuffix(".avro")
+ .withSchema(Event.class));
+ source.apply(NexmarkQuery.JUST_BIDS)
+ .apply(
+ AvroIO.Write.named(queryName + ".WriteAvroBids(" + filename.replace('/', '.') + ")")
+ .to(filename + "/bid")
+ .withSuffix(".avro")
+ .withSchema(Bid.class));
+ source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+ .apply(AvroIO.Write.named(
+ queryName + ".WriteAvroAuctions(" + filename.replace('/', '.') + ")")
+ .to(filename + "/auction")
+ .withSuffix(".avro")
+ .withSchema(Auction.class));
+ source.apply(NexmarkQuery.JUST_NEW_PERSONS)
+ .apply(
+ AvroIO.Write.named(queryName + ".WriteAvroPeople(" + filename.replace('/', '.') + ")")
+ .to(filename + "/person")
+ .withSuffix(".avro")
+ .withSchema(Person.class));
+ }
+
+ private static class StringToTableRow extends DoFn {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = new TableRow();
+ row.set("event", c.element());
+ c.output(row);
+ }
+ }
+
+ /**
+ * Send {@code formattedResults} to text files.
+ */
+ private void sinkToText(PCollection formattedResults, long now) {
+ String filename = options.getOutputPath();
+ if (filename == null || filename.isEmpty()) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ String fullFilename;
+ if (options.getUniqify()) {
+ fullFilename = String.format("%s/nexmark_%d_%s.txt", filename, now, queryName);
+ } else {
+ fullFilename = String.format("%s/nexmark_%s.txt", filename, queryName);
+ }
+ NexmarkUtils.console("Writing results to text files at %s", fullFilename);
+ formattedResults.apply(
+ TextIO.Write.named(queryName + ".WriteTextResults(" + fullFilename.replace('/', '.') + ")")
+ .to(fullFilename));
+ }
+
+ /**
+ * Send {@code formattedResults} to BigQuery.
+ */
+ private void sinkToBigQuery(PCollection formattedResults, long now) {
+ String tableName;
+ if (options.getUniqify()) {
+ tableName = String.format("%s:nexmark.table_%d", options.getProject(), now);
+ } else {
+ tableName = String.format("%s:nexmark.table", options.getProject());
+ }
+ TableSchema schema =
+ new TableSchema()
+ .setFields(ImmutableList.of(new TableFieldSchema().setName("result")
+ .setType("STRING")));
+ NexmarkUtils.console("Writing results to BigQuery table %s", tableName);
+ BigQueryIO.Write.Bound io =
+ BigQueryIO.Write.named(
+ queryName + ".WriteBigQueryResults(" + tableName.replace('/', '.') + ")")
+ .to(tableName)
+ .withSchema(schema);
+ formattedResults
+ .apply(ParDo.of(new StringToTableRow()))
+ .apply(io);
+ }
+
+ // ================================================================================
+ // Construct overall pipeline
+ // ================================================================================
+
+ /**
+ * Return source of events for this run, or null if we are simply publishing events
+ * to Pubsub.
+ */
+ private PCollection createSource(Pipeline p, final long now) {
+ PCollection source = null;
+ switch (configuration.sourceType) {
+ case DIRECT:
+ source = sourceFromSynthetic(p);
+ break;
+ case AVRO:
+ source = sourceFromAvro(p);
+ break;
+ case PUBSUB:
+ // Check some flags.
+ switch (configuration.pubSubMode) {
+ case SUBSCRIBE_ONLY:
+ break;
+ case PUBLISH_ONLY:
+ if (options.getManageResources() && !isBlocking()) {
+ throw new RuntimeException(
+ "If --manageResources=true and --pubSubMode=PUBLISH_ONLY then "
+ + "runner must be blocking so that this program "
+ + "can cleanup the Pubsub topic on exit.");
+ }
+ break;
+ case COMBINED:
+ if (!canMonitor() || !options.getMonitorJobs()) {
+ throw new RuntimeException(
+ "if --pubSubMode=COMBINED then you must use a monitoring runner "
+ + "and set --monitorJobs=true so that the two pipelines can be coordinated.");
+ }
+ break;
+ }
+
+ // Setup the sink for the publisher.
+ switch (configuration.pubSubMode) {
+ case SUBSCRIBE_ONLY:
+ // Nothing to publish.
+ break;
+ case PUBLISH_ONLY:
+ // Send synthesized events to Pubsub in this job.
+ sinkEventsToPubsub(sourceFromSynthetic(p).apply(NexmarkUtils.snoop(queryName)), now);
+ break;
+ case COMBINED:
+ // Send synthesized events to Pubsub in separate publisher job.
+ // We won't start the main pipeline until the publisher has sent the pre-load events.
+ // We'll shutdown the publisher job when we notice the main job has finished.
+ invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
+ @Override
+ public void build(Options publishOnlyOptions) {
+ Pipeline sp = Pipeline.create(options);
+ NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
+ publisherMonitor = new Monitor(queryName, "publisher");
+ sinkEventsToPubsub(sourceFromSynthetic(sp).apply(publisherMonitor.getTransform()),
+ now);
+ publisherResult = sp.run();
+ }
+ });
+ break;
+ }
+
+ // Setup the source for the consumer.
+ switch (configuration.pubSubMode) {
+ case PUBLISH_ONLY:
+ // Nothing to consume. Leave source null.
+ break;
+ case SUBSCRIBE_ONLY:
+ case COMBINED:
+ // Read events from pubsub.
+ source = sourceEventsFromPubsub(p, now);
+ break;
+ }
+ break;
+ }
+ return source;
+ }
+
+ /**
+ * Consume {@code results}.
+ */
+ private void sink(PCollection> results, long now) {
+ if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
+ // Avoid the cost of formatting the results.
+ results.apply(NexmarkUtils.devNull(queryName));
+ return;
+ }
+
+ PCollection