Skip to content

Commit baf15da

Browse files
authored
All feast types are supported by avro decoder (#25)
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent c288af8 commit baf15da

File tree

6 files changed

+317
-110
lines changed

6 files changed

+317
-110
lines changed

serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java

Lines changed: 134 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,20 @@
4747
import java.io.ByteArrayOutputStream;
4848
import java.io.File;
4949
import java.io.IOException;
50+
import java.nio.ByteBuffer;
5051
import java.time.Duration;
5152
import java.util.HashMap;
5253
import java.util.List;
5354
import java.util.Map;
5455
import java.util.stream.Collectors;
5556
import org.apache.avro.Schema;
5657
import org.apache.avro.SchemaBuilder;
58+
import org.apache.avro.generic.GenericData;
5759
import org.apache.avro.generic.GenericDatumWriter;
5860
import org.apache.avro.generic.GenericRecord;
5961
import org.apache.avro.generic.GenericRecordBuilder;
60-
import org.apache.avro.io.*;
62+
import org.apache.avro.io.Encoder;
63+
import org.apache.avro.io.EncoderFactory;
6164
import org.junit.ClassRule;
6265
import org.junit.jupiter.api.AfterAll;
6366
import org.junit.jupiter.api.BeforeAll;
@@ -140,9 +143,6 @@ static void globalSetup() throws IOException {
140143
+ ":"
141144
+ environment.getServicePort("bigtable_1", BIGTABLE_PORT);
142145
channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
143-
TransportChannelProvider channelProvider =
144-
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
145-
NoCredentialsProvider credentialsProvider = NoCredentialsProvider.create();
146146

147147
/** Feast resource creation Workflow */
148148
String projectName = "default";
@@ -210,9 +210,6 @@ static void globalSetup() throws IOException {
210210
ImmutableList<String> compoundColumnFamilies =
211211
ImmutableList.of(rideMerchantFeatureTableName, metadataColumnFamily);
212212

213-
createTable(channelProvider, credentialsProvider, btTableName, columnFamilies);
214-
createTable(channelProvider, credentialsProvider, compoundBtTableName, compoundColumnFamilies);
215-
216213
/** Single Entity Ingestion Workflow */
217214
Schema ftSchema =
218215
SchemaBuilder.record("DriverData")
@@ -319,7 +316,9 @@ private static void createTable(
319316
for (String columnFamily : columnFamilies) {
320317
createTableRequest.addFamily(columnFamily);
321318
}
322-
client.createTable(createTableRequest);
319+
if (!client.exists(tableName)) {
320+
client.createTable(createTableRequest);
321+
}
323322
}
324323
}
325324

@@ -348,17 +347,31 @@ private static byte[] createEntityValue(
348347
return entityFeatureValue;
349348
}
350349

350+
private static byte[] schemaReference(Schema schema) {
351+
return Hashing.murmur3_32().hashBytes(schema.toString().getBytes()).asBytes();
352+
}
353+
351354
private static void ingestData(
352355
String featureTableName,
353356
String btTableName,
354357
byte[] btEntityFeatureKey,
355358
byte[] btEntityFeatureValue,
356359
byte[] btSchemaKey,
357-
Schema btSchema) {
360+
Schema btSchema)
361+
throws IOException {
358362
String emptyQualifier = "";
359363
String avroQualifier = "avro";
360364
String metadataColumnFamily = "metadata";
361365

366+
TransportChannelProvider channelProvider =
367+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
368+
NoCredentialsProvider credentialsProvider = NoCredentialsProvider.create();
369+
createTable(
370+
channelProvider,
371+
credentialsProvider,
372+
btTableName,
373+
ImmutableList.of(featureTableName, metadataColumnFamily));
374+
362375
// Update Compound Entity-Feature Row
363376
client.mutateRow(
364377
RowMutation.create(btTableName, ByteString.copyFrom(btEntityFeatureKey))
@@ -601,6 +614,118 @@ public void shouldReturnCorrectRowCount() {
601614
assertEquals(expectedFieldValuesList, featureResponse.getFieldValuesList());
602615
}
603616

617+
@Test
618+
public void shouldSupportAllFeastTypes() throws IOException {
619+
EntityProto.EntitySpecV2 entitySpec =
620+
EntityProto.EntitySpecV2.newBuilder()
621+
.setName("entity")
622+
.setDescription("")
623+
.setValueType(ValueProto.ValueType.Enum.STRING)
624+
.build();
625+
TestUtils.applyEntity(coreClient, "default", entitySpec);
626+
627+
ImmutableMap<String, ValueProto.ValueType.Enum> allTypesFeatures =
628+
new ImmutableMap.Builder<String, ValueProto.ValueType.Enum>()
629+
.put("f_int64", ValueProto.ValueType.Enum.INT64)
630+
.put("f_int32", ValueProto.ValueType.Enum.INT32)
631+
.put("f_float", ValueProto.ValueType.Enum.FLOAT)
632+
.put("f_double", ValueProto.ValueType.Enum.DOUBLE)
633+
.put("f_string", ValueProto.ValueType.Enum.STRING)
634+
.put("f_bytes", ValueProto.ValueType.Enum.BYTES)
635+
.put("f_bool", ValueProto.ValueType.Enum.BOOL)
636+
.put("f_int64_list", ValueProto.ValueType.Enum.INT64_LIST)
637+
.put("f_int32_list", ValueProto.ValueType.Enum.INT32_LIST)
638+
.put("f_float_list", ValueProto.ValueType.Enum.FLOAT_LIST)
639+
.put("f_double_list", ValueProto.ValueType.Enum.DOUBLE_LIST)
640+
.put("f_string_list", ValueProto.ValueType.Enum.STRING_LIST)
641+
.put("f_bytes_list", ValueProto.ValueType.Enum.BYTES_LIST)
642+
.put("f_bool_list", ValueProto.ValueType.Enum.BOOL_LIST)
643+
.build();
644+
645+
TestUtils.applyFeatureTable(
646+
coreClient, "default", "all_types", ImmutableList.of("entity"), allTypesFeatures, 7200);
647+
648+
Schema schema =
649+
SchemaBuilder.record("AllTypesRecord")
650+
.namespace("")
651+
.fields()
652+
.requiredLong("f_int64")
653+
.requiredInt("f_int32")
654+
.requiredFloat("f_float")
655+
.requiredDouble("f_double")
656+
.requiredString("f_string")
657+
.requiredBytes("f_bytes")
658+
.requiredBoolean("f_bool")
659+
.name("f_int64_list")
660+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().longType()))
661+
.noDefault()
662+
.name("f_int32_list")
663+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().intType()))
664+
.noDefault()
665+
.name("f_float_list")
666+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().floatType()))
667+
.noDefault()
668+
.name("f_double_list")
669+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().doubleType()))
670+
.noDefault()
671+
.name("f_string_list")
672+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().stringType()))
673+
.noDefault()
674+
.name("f_bytes_list")
675+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().bytesType()))
676+
.noDefault()
677+
.name("f_bool_list")
678+
.type(SchemaBuilder.array().items(SchemaBuilder.builder().booleanType()))
679+
.noDefault()
680+
.endRecord();
681+
682+
GenericData.Record record =
683+
new GenericRecordBuilder(schema)
684+
.set("f_int64", 10L)
685+
.set("f_int32", 10)
686+
.set("f_float", 10.0)
687+
.set("f_double", 10.0D)
688+
.set("f_string", "test")
689+
.set("f_bytes", ByteBuffer.wrap("test".getBytes()))
690+
.set("f_bool", true)
691+
.set("f_int64_list", ImmutableList.of(10L))
692+
.set("f_int32_list", ImmutableList.of(10))
693+
.set("f_float_list", ImmutableList.of(10.0))
694+
.set("f_double_list", ImmutableList.of(10.0D))
695+
.set("f_string_list", ImmutableList.of("test"))
696+
.set("f_bytes_list", ImmutableList.of(ByteBuffer.wrap("test".getBytes())))
697+
.set("f_bool_list", ImmutableList.of(true))
698+
.build();
699+
700+
ValueProto.Value entity = DataGenerator.createStrValue("key");
701+
702+
ingestData(
703+
"all_types",
704+
"default__entity",
705+
entity.getStringVal().getBytes(),
706+
createEntityValue(schema, schemaReference(schema), record),
707+
createSchemaKey(schemaReference(schema)),
708+
schema);
709+
710+
GetOnlineFeaturesRequestV2 onlineFeatureRequest =
711+
TestUtils.createOnlineFeatureRequest(
712+
"default",
713+
allTypesFeatures.keySet().stream()
714+
.map(
715+
f ->
716+
FeatureReferenceV2.newBuilder()
717+
.setFeatureTable("all_types")
718+
.setName(f)
719+
.build())
720+
.collect(Collectors.toList()),
721+
ImmutableList.of(DataGenerator.createEntityRow("entity", entity, 100)));
722+
GetOnlineFeaturesResponse featureResponse =
723+
servingStub.getOnlineFeaturesV2(onlineFeatureRequest);
724+
725+
assert featureResponse.getFieldValues(0).getStatusesMap().values().stream()
726+
.allMatch(status -> status.equals(GetOnlineFeaturesResponse.FieldStatus.PRESENT));
727+
}
728+
604729
@TestConfiguration
605730
public static class TestConfig {
606731
@Bean

storage/api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@
6161
<version>3.9</version>
6262
</dependency>
6363

64+
<dependency>
65+
<groupId>org.apache.avro</groupId>
66+
<artifactId>avro</artifactId>
67+
<version>1.10.2</version>
68+
</dependency>
69+
6470
<dependency>
6571
<groupId>junit</groupId>
6672
<artifactId>junit</artifactId>
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.storage.api.retriever;
18+
19+
import com.google.protobuf.ByteString;
20+
import com.google.protobuf.Timestamp;
21+
import feast.proto.serving.ServingAPIProto;
22+
import feast.proto.types.ValueProto;
23+
import java.nio.ByteBuffer;
24+
import java.util.stream.Collectors;
25+
import org.apache.avro.generic.GenericData;
26+
import org.apache.avro.util.Utf8;
27+
28+
public class AvroFeature implements Feature {
29+
private final ServingAPIProto.FeatureReferenceV2 featureReference;
30+
31+
private final Timestamp eventTimestamp;
32+
33+
private final Object featureValue;
34+
35+
public AvroFeature(
36+
ServingAPIProto.FeatureReferenceV2 featureReference,
37+
Timestamp eventTimestamp,
38+
Object featureValue) {
39+
this.featureReference = featureReference;
40+
this.eventTimestamp = eventTimestamp;
41+
this.featureValue = featureValue;
42+
}
43+
44+
/**
45+
* Casts feature value of Object type based on Feast valueType. Empty object i.e new Object() is
46+
* interpreted as VAL_NOT_SET Feast valueType.
47+
*
48+
* @param valueType Feast valueType of feature as specified in FeatureSpec
49+
* @return ValueProto.Value representation of feature
50+
*/
51+
@Override
52+
public ValueProto.Value getFeatureValue(ValueProto.ValueType.Enum valueType) {
53+
ValueProto.Value finalValue;
54+
55+
try {
56+
switch (valueType) {
57+
case STRING:
58+
finalValue =
59+
ValueProto.Value.newBuilder().setStringVal(((Utf8) featureValue).toString()).build();
60+
break;
61+
case INT32:
62+
finalValue = ValueProto.Value.newBuilder().setInt32Val((Integer) featureValue).build();
63+
break;
64+
case INT64:
65+
finalValue = ValueProto.Value.newBuilder().setInt64Val((Long) featureValue).build();
66+
break;
67+
case DOUBLE:
68+
finalValue = ValueProto.Value.newBuilder().setDoubleVal((Double) featureValue).build();
69+
break;
70+
case FLOAT:
71+
finalValue = ValueProto.Value.newBuilder().setFloatVal((Float) featureValue).build();
72+
break;
73+
case BYTES:
74+
finalValue =
75+
ValueProto.Value.newBuilder()
76+
.setBytesVal(ByteString.copyFrom(((ByteBuffer) featureValue).array()))
77+
.build();
78+
break;
79+
case BOOL:
80+
finalValue = ValueProto.Value.newBuilder().setBoolVal((Boolean) featureValue).build();
81+
break;
82+
case STRING_LIST:
83+
finalValue =
84+
ValueProto.Value.newBuilder()
85+
.setStringListVal(
86+
ValueProto.StringList.newBuilder()
87+
.addAllVal(
88+
((GenericData.Array<Utf8>) featureValue)
89+
.stream().map(Utf8::toString).collect(Collectors.toList()))
90+
.build())
91+
.build();
92+
break;
93+
case INT64_LIST:
94+
finalValue =
95+
ValueProto.Value.newBuilder()
96+
.setInt64ListVal(
97+
ValueProto.Int64List.newBuilder()
98+
.addAllVal(((GenericData.Array<Long>) featureValue))
99+
.build())
100+
.build();
101+
break;
102+
case INT32_LIST:
103+
finalValue =
104+
ValueProto.Value.newBuilder()
105+
.setInt32ListVal(
106+
ValueProto.Int32List.newBuilder()
107+
.addAllVal(((GenericData.Array<Integer>) featureValue))
108+
.build())
109+
.build();
110+
break;
111+
case FLOAT_LIST:
112+
finalValue =
113+
ValueProto.Value.newBuilder()
114+
.setFloatListVal(
115+
ValueProto.FloatList.newBuilder()
116+
.addAllVal(((GenericData.Array<Float>) featureValue))
117+
.build())
118+
.build();
119+
break;
120+
case DOUBLE_LIST:
121+
finalValue =
122+
ValueProto.Value.newBuilder()
123+
.setDoubleListVal(
124+
ValueProto.DoubleList.newBuilder()
125+
.addAllVal(((GenericData.Array<Double>) featureValue))
126+
.build())
127+
.build();
128+
break;
129+
case BOOL_LIST:
130+
finalValue =
131+
ValueProto.Value.newBuilder()
132+
.setBoolListVal(
133+
ValueProto.BoolList.newBuilder()
134+
.addAllVal(((GenericData.Array<Boolean>) featureValue))
135+
.build())
136+
.build();
137+
break;
138+
case BYTES_LIST:
139+
finalValue =
140+
ValueProto.Value.newBuilder()
141+
.setBytesListVal(
142+
ValueProto.BytesList.newBuilder()
143+
.addAllVal(
144+
((GenericData.Array<ByteBuffer>) featureValue)
145+
.stream()
146+
.map(byteBuffer -> ByteString.copyFrom(byteBuffer.array()))
147+
.collect(Collectors.toList()))
148+
.build())
149+
.build();
150+
break;
151+
default:
152+
throw new RuntimeException("FeatureType is not supported");
153+
}
154+
} catch (ClassCastException e) {
155+
// Feature type has changed
156+
finalValue = ValueProto.Value.newBuilder().build();
157+
}
158+
159+
return finalValue;
160+
}
161+
162+
@Override
163+
public ServingAPIProto.FeatureReferenceV2 getFeatureReference() {
164+
return this.featureReference;
165+
}
166+
167+
@Override
168+
public Timestamp getEventTimestamp() {
169+
return this.eventTimestamp;
170+
}
171+
}

0 commit comments

Comments
 (0)