-
Notifications
You must be signed in to change notification settings - Fork 319
Conversation
|
Thanks for your pull request. It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). 📝 Please visit https://cla.developers.google.com/ to sign. Once you've signed, please reply here (e.g.
|
|
I signed the CLA. |
|
CLAs look good, thanks! |
|
The functionality is well tested. will update javadoc today. |
This certainly looks much better.
|
@dhalperi Updated Read interface as we discussed. Needed changes to tests as well. |
| * <h3>Reading from Cloud Bigtable</h3> | ||
| * | ||
| * <p>The Bigtable source returns a set of rows from a single table, returning a | ||
| * {@code PCollection<Row>}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Eclipse didn't render this properly.
|
Daniel has already taken a look at this a month back, can you re-skim it? Mark has been implementing PubSub. Mark, can you take a look at the Kafka source and see what lessons you can impart? |
| import static com.google.common.base.Preconditions.checkNotNull; | ||
| import static com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for repackaged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops! Thanks for catching.
|
LGTM
|
|
Thanks @mshields822. |
|
Looks like you need to push? I'm going to stop commenting on outstanding feedback :) |
|
sorry, forgot to push before heading to lunch. Addressed all the comments except last one. I will double check any remaining comments. |
| } | ||
|
|
||
| ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next(); | ||
| long consumed = pState.consumedOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a bound on the number of records that we'll have to skip? We can only log a few 1000 per second, so if it's millions then logging each record will induce large delays in starting up.
Alternative would be to set a flag and only log once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be fairly small (10s to 100s). The upper limit is the number of kafka records compressed together. 0.10.x KafkaConsumer already fixed this issue with compressed messages and it skips them. So it is not at all expected in near future.
One case where it can cause millions of records is the case I mention above where Kafka is restarted from scratch which resets the offsets while the Dataflow app is running. Not sure weather we want to handle that case. It is better for the user to realize this problem (should be very rare), and take appropriate action (may be just restarting the Dataflow app).
That said I can certainly add a flag to limit it to one message.
|
Closing in favor of apache/beam#142 |
Current status (3/18/16) : well tested and feature complete. going through reviews.
This is a preliminary PR. It is not tested yet. I am working on example application and unit tests. Some TODOs :