diff --git a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py new file mode 100644 index 000000000000..13ebabf56d45 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference API to classify MNIST data. + +This pipeline takes a text file in which data is comma separated ints. The first +column would be the true label and the rest would be the pixel values. The data +is processed and then a model trained on the MNIST data would be used to perform +the inference. The pipeline writes the prediction to an output file in which +users can then compare against the true label. +""" + +import argparse +from typing import Iterable +from typing import List +from typing import Tuple + +import apache_beam as beam +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + + +def process_input(row: str) -> Tuple[int, List[int]]: + data = row.split(',') + label, pixels = int(data[0]), data[1:] + pixels = [int(pixel) for pixel in pixels] + return label, pixels + + +class PostProcessor(beam.DoFn): + """Process the PredictionResult to get the predicted label. + Returns a comma separated string with true label and predicted label. + """ + def process(self, element: Tuple[int, PredictionResult]) -> Iterable[str]: + label, prediction_result = element + prediction = prediction_result.inference + yield '{},{}'.format(label, prediction) + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input_file', + dest='input', + required=True, + help='text file with comma separated int values.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path to save output predictions.') + parser.add_argument( + '--model_path', + dest='model_path', + required=True, + help='Path to load the Sklearn model for Inference.') + return parser.parse_known_args(argv) + + +def run(argv=None, save_main_session=True): + """Entry point. Defines and runs the pipeline.""" + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # In this example we pass keyed inputs to RunInference transform. + # Therefore, we use KeyedModelHandler wrapper over SklearnModelHandlerNumpy. + model_loader = KeyedModelHandler( + SklearnModelHandlerNumpy( + model_file_type=ModelFileType.PICKLE, + model_uri=known_args.model_path)) + + with beam.Pipeline(options=pipeline_options) as p: + label_pixel_tuple = ( + p + | "ReadFromInput" >> beam.io.ReadFromText( + known_args.input, skip_header_lines=1) + | "PreProcessInputs" >> beam.Map(process_input)) + + predictions = ( + label_pixel_tuple + | "RunInference" >> RunInference(model_loader) + | "PostProcessOutputs" >> beam.ParDo(PostProcessor())) + + _ = predictions | "WriteOutput" >> beam.io.WriteToText( + known_args.output, + shard_name_template='', + append_trailing_newlines=True) + + +if __name__ == '__main__': + run() diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py new file mode 100644 index 000000000000..be5da3f4320b --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-End test for Sklearn Inference""" + +import logging +import unittest +import uuid + +import pytest + +from apache_beam.examples.inference import sklearn_mnist_classification +from apache_beam.io.filesystems import FileSystems +from apache_beam.testing.test_pipeline import TestPipeline + + +def process_outputs(filepath): + with FileSystems().open(filepath) as f: + lines = f.readlines() + lines = [l.decode('utf-8').strip('\n') for l in lines] + return lines + + +@pytest.mark.skip +@pytest.mark.uses_sklearn +@pytest.mark.it_postcommit +class SklearnInference(unittest.TestCase): + def test_sklearn_mnist_classification(self): + test_pipeline = TestPipeline(is_integration_test=False) + input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv' + output_file_dir = 'gs://temp-storage-for-end-to-end-tests' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + model_path = 'gs://apache-beam-ml/models/mnist_model_svm.pickle' + extra_opts = { + 'input': input_file, + 'output': output_file, + 'model_path': model_path, + } + sklearn_mnist_classification.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + self.assertEqual(FileSystems().exists(output_file), True) + + expected_output_filepath = 'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt' # pylint: disable=line-too-long + expected_outputs = process_outputs(expected_output_filepath) + + predicted_outputs = process_outputs(output_file) + self.assertEqual(len(expected_outputs), len(predicted_outputs)) + + predictions_dict = {} + for i in range(len(predicted_outputs)): + true_label, prediction = predicted_outputs[i].split(',') + predictions_dict[true_label] = prediction + + for i in range(len(expected_outputs)): + true_label, expected_prediction = expected_outputs[i].split(',') + self.assertEqual(predictions_dict[true_label], expected_prediction) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main() diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index e72ccd15d149..0a9a274f1f1a 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -48,6 +48,7 @@ markers = # We run these tests with multiple major pyarrow versions (BEAM-11211) uses_pyarrow: tests that utilize pyarrow in some way uses_pytorch: tests that utilize pytorch in some way + uses_sklearn: tests that utilize scikit-learn in some way # Default timeout intended for unit tests. # If certain tests need a different value, please see the docs on how to diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index f08655e02810..67fb321c0c07 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -187,7 +187,7 @@ tasks.register("hdfsIntegrationTest") { } // Pytorch RunInference IT tests -task torchTests { +task torchInferenceTest { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' def requirementsFile = "${rootDir}/sdks/python/apache_beam/ml/inference/torch_tests_requirements.txt" @@ -211,10 +211,32 @@ task torchTests { args '-c', ". ${envdir}/bin/activate && export FORCE_TORCH_IT=1 && ${runScriptsDir}/run_integration_test.sh $cmdArgs" } } + +} +// Scikit-learn RunInference IT tests +task sklearnInferenceTest { + dependsOn 'installGcpTest' + dependsOn ':sdks:python:sdist' + doLast { + def testOpts = basicTestOpts + def argMap = [ + "test_opts": testOpts, + "suite": "postCommitIT-direct-py${pythonVersionSuffix}", + "collect": "uses_sklearn and it_postcommit" , + "runner": "TestDirectRunner" + ] + def cmdArgs = mapToArgString(argMap) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs" + } + } } // Add all the RunInference framework IT tests to this gradle task that runs on Direct Runner Post commit suite. -// TODO(anandinguva): Add sklearn IT test here project.tasks.register("inferencePostCommitIT") { - dependsOn = ['torchTests'] + dependsOn = [ + 'torchInferenceTest', + 'sklearnInferenceTest' + ] }