From 8a3883f387407d71a9165b5eb00d8aa9c8c660d3 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 5 Apr 2019 13:55:49 +0200 Subject: [PATCH] Also test registration of standard coders. --- .../coders/standard_coders_test.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index 36f1d89adfe1..79c06f409e34 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -30,7 +30,8 @@ import yaml from apache_beam.coders import coder_impl -from apache_beam.coders import coders +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.runners import pipeline_context from apache_beam.transforms import window from apache_beam.transforms.window import IntervalWindow from apache_beam.utils import windowed_value @@ -55,19 +56,6 @@ def _load_test_cases(test_yaml): class StandardCodersTest(unittest.TestCase): - _urn_to_coder_class = { - 'beam:coder:bytes:v1': coders.BytesCoder, - 'beam:coder:varint:v1': coders.VarIntCoder, - 'beam:coder:kv:v1': lambda k, v: coders.TupleCoder((k, v)), - 'beam:coder:interval_window:v1': coders.IntervalWindowCoder, - 'beam:coder:iterable:v1': lambda t: coders.IterableCoder(t), - 'beam:coder:global_window:v1': coders.GlobalWindowCoder, - 'beam:coder:windowed_value:v1': - lambda v, w: coders.WindowedValueCoder(v, w), - 'beam:coder:timer:v1': coders._TimerCoder, - 'beam:coder:double:v1': coders.FloatCoder, - } - _urn_to_json_value_parser = { 'beam:coder:bytes:v1': lambda x: x.encode('utf-8'), 'beam:coder:varint:v1': lambda x: x, @@ -128,8 +116,16 @@ def assert_equal(actual, expected): value) def parse_coder(self, spec): - return self._urn_to_coder_class[spec['urn']]( - *[self.parse_coder(c) for c in spec.get('components', ())]) + context = pipeline_context.PipelineContext() + coder_id = str(hash(str(spec))) + component_ids = [context.coders.get_id(self.parse_coder(c)) + for c in spec.get('components', ())] + context.coders.put_proto(coder_id, beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=spec['urn'], payload=spec.get('payload'))), + component_coder_ids=component_ids)) + return context.coders.get_by_id(coder_id) def json_value_parser(self, coder_spec): component_parsers = [