Skip to content

Conversation

@ryanthompson591
Copy link
Contributor

@ryanthompson591 ryanthompson591 commented Feb 28, 2022

Make a base class for a transform that run inferences.

It will measure metrics, load models (using beam.shared) and batch inferences and run them.

See design doc: https://s.apache.org/inference-sklearn-pytorch


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot removed the examples label Feb 28, 2022
@codecov
Copy link

codecov bot commented Feb 28, 2022

Codecov Report

Merging #16970 (50b52ee) into master (33c514b) will increase coverage by 0.02%.
The diff coverage is 92.53%.

@@            Coverage Diff             @@
##           master   #16970      +/-   ##
==========================================
+ Coverage   73.94%   73.96%   +0.02%     
==========================================
  Files         684      685       +1     
  Lines       89519    89653     +134     
==========================================
+ Hits        66197    66315     +118     
- Misses      22162    22178      +16     
  Partials     1160     1160              
Flag Coverage Δ
python 83.65% <92.53%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/ml/inference/base.py 92.53% <92.53%> (ø)
...eam/runners/portability/fn_api_runner/execution.py 92.25% <0.00%> (-0.81%) ⬇️
...che_beam/runners/interactive/interactive_runner.py 92.25% <0.00%> (-0.71%) ⬇️
sdks/python/apache_beam/runners/common.py 89.98% <0.00%> (-0.15%) ⬇️
sdks/python/apache_beam/internal/metrics/metric.py 93.00% <0.00%> (+1.00%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 33c514b...50b52ee. Read the comment docs.

@ryanthompson591
Copy link
Contributor Author

R: yeandy

@yeandy
Copy link
Contributor

yeandy commented Mar 2, 2022

CC: @TheNeuralBit and @tvalentyn

Copy link
Contributor

@yeandy yeandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass looks good


class InferenceRunner:
"""Implements running inferences for a framework."""
def run_inference(self, batch: Any, model: Any) -> List[PredictionResult]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want Iterable[PredictionResult] instead of List[PredictionResult]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like Iterable, it is more generic. Done.


class ModelLoader:
"""Has the ability to load an ML model."""
def load_model(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def load_model(self):
def load_model(self) -> InferenceRunner:

raise NotImplementedError(type(self))


class InferenceRunner:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robert had something like

ExampleBatchType = TypeVar('ExampleBatchType')
InferenceBatchType = TypeVar('InferenceBatchType')
 
class InferenceRunner(Generic[ExampleBatchType, InferenceBatchType]):
  def to_impl_batch(batch: Iterable[Any] -> ExampleBatchType):
    raise NotImplementedError(type(self))
 
  def from_impl_batch(inferences: InferenceBatchType) -> Iterable[Any]:
    raise NotImplementedError(type(self))
 
  def run_inference(self, batch : ExampleBatchType) -> InferenceBatchType:
    raise NotImplementedError(type(self))

I'm guessing it's not necessary to have the type variables if you're not converting to and from batches? But how do we structure this if we want to leverage Batched DoFn in the future? Thought it's probably not necessary to do the batching explicitly if we can leverage something like @DoFn.yields_batches? @TheNeuralBit what are your thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about doing that change as part of https://issues.apache.org/jira/browse/BEAM-14044. I suppose this sort of feature would depend a lot on how we implement batching more specifically per framework. Let me add Robert as a reviewer at this point and see what he thinks we should do for batching.

return platform.system().startswith('CYGWIN_NT')


class _Clock(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a test for Clock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the clock is very environment specific its a little harder to unit test. However, I do think I should change this up (it's based on the TFX version) and make this mockable so we can write better unit tests for the metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'd like to be able to do basic validation for Clock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to define our own clock abstraction? The solution in BatchElements is just to provide a kwarg allowing tests to provide a mock to override time.time:

def __init__(
self,
min_batch_size=1,
max_batch_size=10000,
target_batch_overhead=.05,
target_batch_duration_secs=1,
variance=0.25,
clock=time.time):

If we do need this (maybe for the fine-grained clock?) it would be nice to make it a general Beam solution as this is a common need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all for clock having the same API as time.time (i.e. a callable that returns the number of seconds as a floating point). We can have various implementations as needed for more precision but wouldn't need to add new concepts/APIs (and would be consistent with what we do for BatchElements).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to punt on this change for now and talk to the TFX team as to why they did this, and convince them to use the cleaner time.time solution if they have no reason not to.

https://issues.apache.org/jira/browse/BEAM-14255

I spent some time experimenting with this. AFAICT it's possible we lose 1 microsecond of precision in a measurement 50% of the time if we use the floating point time, instead of the nanosecond interface.

The cleanest would be to have no abstractions at all and just use time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, please reference this jira in a TODO in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more complex than that they just wanted nanosecond precision?

Copy link
Contributor Author

@ryanthompson591 ryanthompson591 Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. It is not more complex than that - also this immediately gets converted to microsecond precision.

I'm going to talk with them about simplifying this. My intuition is that converting time.time() to a microsecond is adequate for their needs.

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (
pcoll
# TODO: Hook into the batching DoFn APIs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a JIRA for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, please assign this to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/BEAM-14044

Go ahead and take this when we are ready.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made one yesterday but forgot to update this comment 😆 I don't think I have delete permissions to get rid of this though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't delete just mark duplicate.

@yeandy
Copy link
Contributor

yeandy commented Mar 2, 2022

CC: @kevingg

@ryanthompson591
Copy link
Contributor Author

R: @tvalentyn @robertwb
CC: @kerrydc

Comment on lines 143 to 144
inference_generator = self._inference_runner.run_inference(
examples, self._model)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time saved is probably negligible, but should we put start_time and inference_latency around just inference_generator, instead of also around the manipulations w/ the keys and wrapping of PredictionResult? Technically, the operations w/ keys and PredictionResult are not related to the pure inference call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm indifferent mostly I can see the advantages of both. I can move it down for now though and we can move it later if we are unhappy.

self._model_loader = model_loader
self._inference_runner = inference_runner
self._shared_model_handle = shared.Shared()
# TODO: Compute a good metrics namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be a future ticket? Or is it just a reminder for this PR? Maybe we could userun_inference_metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can put this in one of the framework defined classes. TFX has a namespace where they define a lot of things about the implementation details. Currently their namespace to reads bulkinference. The name space they define is very model specific.

Comment on lines 135 to 142
has_keys = isinstance(batch[0], tuple)
start_time = self._clock.get_current_time_in_microseconds()
if has_keys:
examples = [example for _, example in batch]
keys = [key for key, _ in batch]
else:
examples = batch
keys = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for a little more understanding of logic, can we add a comment above like "Separating examples from keys"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I added a few comments. The TFX one's logic is a little tricky to follow. Hopefully how keys are separated and recombined make sense now with a few more comments.

_SECOND_TO_MICROSECOND = 1000000


def _unbatch(maybe_keyed_batches: Any):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Any/Tuple[Any, Any] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

from apache_beam.testing.test_pipeline import TestPipeline


class MockModel:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sounds like a FakeModel rather than MockModel, ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return self._shared_model_handle.acquire(load)

def setup(self):
super().setup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's a no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

ryanthompson591 and others added 2 commits March 8, 2022 09:31
typo fix valentyn's suggestion

Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
@ryanthompson591
Copy link
Contributor Author

I don't think the failing tests seem like they are related to this change, though I am not familiar with codecov/patch.

@TheNeuralBit
Copy link
Member

@ryanthompson591 it looks like you picked up a bunch of commits (from master?) and now GitHub is indicating there are merge conflicts. Can you pull out just your commits and update this branch?

@robertwb
Copy link
Contributor

robertwb commented Apr 8, 2022

@TheNeuralBit I agree that there are still improvements that can be done to both the public and private APIs. I would declare even the public API here to be non-final and we should iterate on it (pull requests welcome) before declaring it a such. It will be easier to parallelize the work here once the initial PR is in.

@ryanthompson591
Copy link
Contributor Author

@TheNeuralBit How about now? I tried to merge with master. I only see two new files added. What merge conflict message is given?

@TheNeuralBit
Copy link
Member

Run Python_PVR_Flink PreCommit

@TheNeuralBit
Copy link
Member

No it's not showing any merge conflicts now. The only blocker is Python PreCommit. That seems to be failing at HEAD sadly.

@TheNeuralBit
Copy link
Member

Run Python PreCommit

@TheNeuralBit
Copy link
Member

Filed BEAM-14288 for the PreCommit issue

@TheNeuralBit
Copy link
Member

Run Python PreCommit

@TheNeuralBit
Copy link
Member

Run Python_PVR_Flink PreCommit

@TheNeuralBit
Copy link
Member

Run PythonDocker PreCommit

@TheNeuralBit TheNeuralBit merged commit dffa7c1 into apache:master Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.