From 54dddb815304a82596b6ff72c2d1c7f4cb40865f Mon Sep 17 00:00:00 2001 From: Karl Higley Date: Sat, 18 Mar 2023 17:23:53 -0400 Subject: [PATCH 1/2] Integrate Tensorflow op with session-based TF from Merlin Models --- .../integration/tf/test_transformer_model.py | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 tests/integration/tf/test_transformer_model.py diff --git a/tests/integration/tf/test_transformer_model.py b/tests/integration/tf/test_transformer_model.py new file mode 100644 index 000000000..3c85a825e --- /dev/null +++ b/tests/integration/tf/test_transformer_model.py @@ -0,0 +1,118 @@ +# +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +tf = pytest.importorskip("tensorflow") + +triton = pytest.importorskip("merlin.systems.triton") + +tritonclient = pytest.importorskip("tritonclient") +grpcclient = pytest.importorskip("tritonclient.grpc") + +import merlin.models.tf as mm # noqa +from merlin.datasets.synthetic import generate_data # noqa +from merlin.io import Dataset +from merlin.schema import Tags # noqa +from merlin.systems.dag import Ensemble # noqa +from merlin.systems.dag.ops.tensorflow import PredictTensorflow # noqa +from merlin.systems.triton.utils import run_ensemble_on_tritonserver # noqa + + +def test_serve_tf_with_libtensorflow(tmpdir): + + # =========================================== + # Generate training data + # =========================================== + + train = generate_data("sequence-testing", num_rows=100) + + # len(train.schema.column_names) = 11 + + # =========================================== + # Build, train, test, and JIT the model + # =========================================== + + seq_schema = train.schema.select_by_tag(Tags.SEQUENCE).select_by_tag(Tags.CATEGORICAL) + + target = train.schema.select_by_tag(Tags.ITEM_ID).column_names[0] + predict_last = mm.SequencePredictLast(schema=seq_schema, target=target) + + input_schema = seq_schema + output_schema = seq_schema.select_by_name(target) + + train = Dataset(train.to_ddf(columns=input_schema.column_names).compute()) + train.schema = input_schema + loader = mm.Loader(train, batch_size=16, shuffle=False) + + # TODO: Figure out why the model signature picks up everything from the dataloader, not just the columns it uses + # TODO: Figure out why the outputs are wrong vs the schema (why doesn't models provide an output schema?) + # TODO: Fix the suffixes + # - is it always `name` and `name_1`? + # - what happened to `name_1` and `name_2`? Is this caused by Tensorflow versions? + # - where are we using `__values`/`__nnzs`? + # - Just the dataloader? Elsewhere in Systems? + # TODO: Make the ensemble config line up with the model config + + d_model = 48 + query_encoder = mm.Encoder( + mm.InputBlockV2( + input_schema, + embeddings=mm.Embeddings( + input_schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=None + ), + ), + mm.MLPBlock([d_model]), + mm.GPT2Block(d_model=d_model, n_head=2, n_layer=2), + tf.keras.layers.Lambda(lambda x: tf.reduce_mean(x, axis=1)), + ) + + model = mm.RetrievalModelV2( + query=query_encoder, + output=mm.ContrastiveOutput(output_schema, negative_samplers="in-batch"), + ) + + model.compile(metrics={}) + model.fit(loader, epochs=1, pre=predict_last) + + # =========================================== + # Build a simple Ensemble graph + # =========================================== + tf_op = input_schema.column_names >> PredictTensorflow( + model.query_encoder, input_schema, output_schema + ) + + # len(input_schema.column_names) = 2 + # len(model.input_schema.column_names) = 17 + + ensemble = Ensemble(tf_op, input_schema) + ens_config, node_configs = ensemble.export(str(tmpdir)) + + # =========================================== + # Create Request Data + # =========================================== + + data = generate_data("sequence-testing", num_rows=1) + request_df = data.compute() + + # =========================================== + # Send request to Triton and check response + # =========================================== + response = run_ensemble_on_tritonserver( + tmpdir, input_schema, request_df, output_schema.column_names, node_configs[0].name + ) + + assert response From 97636cdf9a7a6e29e53f1f83bc38faf3938fa9a6 Mon Sep 17 00:00:00 2001 From: Karl Higley Date: Sat, 18 Mar 2023 18:04:08 -0400 Subject: [PATCH 2/2] Make the test pass --- merlin/systems/dag/ops/tensorflow.py | 11 ++++++++- .../dag/runtimes/triton/ops/operator.py | 4 ++-- .../integration/tf/test_transformer_model.py | 23 ++++--------------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/merlin/systems/dag/ops/tensorflow.py b/merlin/systems/dag/ops/tensorflow.py index eeaecc53b..d81b691ff 100644 --- a/merlin/systems/dag/ops/tensorflow.py +++ b/merlin/systems/dag/ops/tensorflow.py @@ -182,6 +182,15 @@ def _ensure_input_spec_includes_names(model): def _build_schema_from_signature(signature): schema = Schema() for col_name, col in signature.items(): - col_schema = ColumnSchema(col_name, dtype=col.dtype.as_numpy_dtype, dims=col.shape) + if "__offsets" in col_name or "__values" in col_name: + col_name = col_name.replace("__offsets", "").replace("__values", "") + col_values_sig = signature[f"{col_name}__values"] + col_offsets_sig = signature[f"{col_name}__offsets"] + col_dtype = col_values_sig.dtype.as_numpy_dtype + col_dims = (col_offsets_sig.shape[0], None) + else: + col_dtype = col.dtype.as_numpy_dtype + col_dims = col.shape + col_schema = ColumnSchema(col_name, dtype=col_dtype, dims=col_dims) schema.column_schemas[col_name] = col_schema return schema diff --git a/merlin/systems/dag/runtimes/triton/ops/operator.py b/merlin/systems/dag/runtimes/triton/ops/operator.py index f9cfbe2df..d22c901ce 100644 --- a/merlin/systems/dag/runtimes/triton/ops/operator.py +++ b/merlin/systems/dag/runtimes/triton/ops/operator.py @@ -207,12 +207,12 @@ def add_model_param(params, paramclass, col_schema, dims=None): paramclass( name=col_schema.name + "__values", data_type=_convert_dtype(col_schema.dtype), - dims=dims, + dims=dims[1:], ) ) params.append( paramclass( - name=col_schema.name + "__offsets", data_type=model_config.TYPE_INT32, dims=dims + name=col_schema.name + "__offsets", data_type=model_config.TYPE_INT32, dims=[-1] ) ) else: diff --git a/tests/integration/tf/test_transformer_model.py b/tests/integration/tf/test_transformer_model.py index 3c85a825e..fd35b1664 100644 --- a/tests/integration/tf/test_transformer_model.py +++ b/tests/integration/tf/test_transformer_model.py @@ -25,14 +25,14 @@ import merlin.models.tf as mm # noqa from merlin.datasets.synthetic import generate_data # noqa -from merlin.io import Dataset +from merlin.io import Dataset # noqa from merlin.schema import Tags # noqa from merlin.systems.dag import Ensemble # noqa from merlin.systems.dag.ops.tensorflow import PredictTensorflow # noqa from merlin.systems.triton.utils import run_ensemble_on_tritonserver # noqa -def test_serve_tf_with_libtensorflow(tmpdir): +def test_serve_tf_session_based_with_libtensorflow(tmpdir): # =========================================== # Generate training data @@ -40,10 +40,8 @@ def test_serve_tf_with_libtensorflow(tmpdir): train = generate_data("sequence-testing", num_rows=100) - # len(train.schema.column_names) = 11 - # =========================================== - # Build, train, test, and JIT the model + # Build and train the model # =========================================== seq_schema = train.schema.select_by_tag(Tags.SEQUENCE).select_by_tag(Tags.CATEGORICAL) @@ -58,15 +56,6 @@ def test_serve_tf_with_libtensorflow(tmpdir): train.schema = input_schema loader = mm.Loader(train, batch_size=16, shuffle=False) - # TODO: Figure out why the model signature picks up everything from the dataloader, not just the columns it uses - # TODO: Figure out why the outputs are wrong vs the schema (why doesn't models provide an output schema?) - # TODO: Fix the suffixes - # - is it always `name` and `name_1`? - # - what happened to `name_1` and `name_2`? Is this caused by Tensorflow versions? - # - where are we using `__values`/`__nnzs`? - # - Just the dataloader? Elsewhere in Systems? - # TODO: Make the ensemble config line up with the model config - d_model = 48 query_encoder = mm.Encoder( mm.InputBlockV2( @@ -95,9 +84,6 @@ def test_serve_tf_with_libtensorflow(tmpdir): model.query_encoder, input_schema, output_schema ) - # len(input_schema.column_names) = 2 - # len(model.input_schema.column_names) = 17 - ensemble = Ensemble(tf_op, input_schema) ens_config, node_configs = ensemble.export(str(tmpdir)) @@ -112,7 +98,8 @@ def test_serve_tf_with_libtensorflow(tmpdir): # Send request to Triton and check response # =========================================== response = run_ensemble_on_tritonserver( - tmpdir, input_schema, request_df, output_schema.column_names, node_configs[0].name + tmpdir, input_schema, request_df, ["output_1"], node_configs[0].name ) assert response + assert len(response["output_1"][0]) == d_model