From 8c86175fa13d2dd1829b84ef0fffa2f32dfd0bc9 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Thu, 1 Sep 2022 13:25:53 +0800 Subject: [PATCH] PulsarSchema: fix the byte array serialization issues. --- .../pulsar/common/schema/PulsarSchema.java | 99 ++++++++++++------- .../pulsar/common/utils/PulsarSerdeUtils.java | 5 +- 2 files changed, 67 insertions(+), 37 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java index 5fd176e73267b..0d0d11497b793 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java @@ -27,8 +27,12 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -57,11 +61,6 @@ public final class PulsarSchema implements Serializable { private transient Schema schema; private transient SchemaInfo schemaInfo; - private String schemaName; - private byte[] schemaBytes; - private SchemaType schemaType; - private Map schemaProperties; - /** Create serializable pulsar schema for primitive types. */ public PulsarSchema(Schema schema) { SchemaInfo info = schema.getSchemaInfo(); @@ -78,7 +77,8 @@ public PulsarSchema(Schema schema) { // Primitive type information could be reflected from the schema class. Class typeClass = getTemplateType1(schema.getClass()); - setSchemaInfo(encodeClassInfo(info, typeClass)); + this.schemaInfo = encodeClassInfo(info, typeClass); + this.schema = createSchema(schemaInfo); } /** @@ -94,7 +94,8 @@ public PulsarSchema(Schema schema, Class typeClass) { "Key Value Schema should provide the type classes of key and value"); validateSchemaInfo(info); - setSchemaInfo(encodeClassInfo(info, typeClass)); + this.schemaInfo = encodeClassInfo(info, typeClass); + this.schema = createSchema(schemaInfo); } /** Create serializable pulsar schema for key value type. */ @@ -117,37 +118,66 @@ public PulsarSchema( SchemaInfo encodedInfo = encodeKeyValueSchemaInfo(info.getName(), infoKey, infoValue, encodingType); - setSchemaInfo(encodeClassInfo(encodedInfo, KeyValue.class)); - } - - /** Validate the schema for having the required class info. */ - private void setSchemaInfo(SchemaInfo schemaInfo) { - this.schema = createSchema(schemaInfo); - this.schemaInfo = schemaInfo; - - this.schemaName = schemaInfo.getName(); - this.schemaBytes = schemaInfo.getSchema(); - this.schemaType = schemaInfo.getType(); - this.schemaProperties = schemaInfo.getProperties(); + this.schemaInfo = encodeClassInfo(encodedInfo, KeyValue.class); + this.schema = createSchema(this.schemaInfo); } public Schema getPulsarSchema() { - if (schema == null) { - this.schema = createSchema(getSchemaInfo()); - } return schema; } public SchemaInfo getSchemaInfo() { - if (schemaInfo == null) { - this.schemaInfo = - new SchemaInfoImpl(schemaName, schemaBytes, schemaType, schemaProperties); - } return schemaInfo; } public Class getRecordClass() { - return decodeClassInfo(getSchemaInfo()); + return decodeClassInfo(schemaInfo); + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + // Name + oos.writeUTF(schemaInfo.getName()); + + // Schema + byte[] schemaBytes = schemaInfo.getSchema(); + oos.writeInt(schemaBytes.length); + oos.write(schemaBytes); + + // Type + SchemaType type = schemaInfo.getType(); + oos.writeInt(type.getValue()); + + // Properties + Map properties = schemaInfo.getProperties(); + oos.writeInt(properties.size()); + for (Map.Entry entry : properties.entrySet()) { + oos.writeUTF(entry.getKey()); + oos.writeUTF(entry.getValue()); + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + // Name + String name = ois.readUTF(); + + // Schema + int byteLen = ois.readInt(); + byte[] schemaBytes = new byte[byteLen]; + ois.readFully(schemaBytes); + + // Type + int typeIdx = ois.readInt(); + SchemaType type = SchemaType.valueOf(typeIdx); + + // Properties + int propSize = ois.readInt(); + Map properties = new HashMap<>(propSize); + for (int i = 0; i < propSize; i++) { + properties.put(ois.readUTF(), ois.readUTF()); + } + + this.schemaInfo = new SchemaInfoImpl(name, schemaBytes, type, properties); + this.schema = createSchema(schemaInfo); } @Override @@ -158,21 +188,24 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - PulsarSchema that = ((PulsarSchema) o); + SchemaInfo that = ((PulsarSchema) o).getPulsarSchema().getSchemaInfo(); - return Objects.equals(schemaType, that.schemaType) - && Arrays.equals(schemaBytes, that.schemaBytes) - && Objects.equals(schemaProperties, that.schemaProperties); + return Objects.equals(schemaInfo.getType(), that.getType()) + && Arrays.equals(schemaInfo.getSchema(), that.getSchema()) + && Objects.equals(schemaInfo.getProperties(), that.getProperties()); } @Override public int hashCode() { - return Objects.hash(schemaType, Arrays.hashCode(schemaBytes), schemaProperties); + return Objects.hash( + schemaInfo.getType(), + Arrays.hashCode(schemaInfo.getSchema()), + schemaInfo.getProperties()); } @Override public String toString() { - return getSchemaInfo().toString(); + return schemaInfo.toString(); } /** diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java index 9f64172a504ef..93609bc720d69 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java @@ -50,10 +50,7 @@ public static void serializeBytes(DataOutputStream out, byte[] bytes) throws IOE public static byte[] deserializeBytes(DataInputStream in) throws IOException { int size = in.readInt(); byte[] bytes = new byte[size]; - int result = in.read(bytes); - if (result < 0) { - throw new IOException("Couldn't deserialize the object, wrong byte buffer."); - } + in.readFully(bytes); return bytes; }