diff --git a/docs/themes/book b/docs/themes/book index 6090fddebdf72..a486adf8462c0 160000 --- a/docs/themes/book +++ b/docs/themes/book @@ -1 +1 @@ -Subproject commit 6090fddebdf7272995c9cef36edf0cff61e261b9 +Subproject commit a486adf8462c0abfc9034436ddd72927d6656809 diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index b658b1d8b8969..6999ed3179ce5 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -36,7 +36,7 @@ under the License. jar - 2.10.1 + 2.10.2 0.6.1 diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java index bb09315e9157f..c0f236c60c6c0 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java @@ -155,6 +155,9 @@ private void writeObject(ObjectOutputStream oos) throws IOException { oos.writeUTF(entry.getKey()); oos.writeUTF(entry.getValue()); } + + // Timestamp + oos.writeLong(schemaInfo.getTimestamp()); } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { @@ -177,7 +180,16 @@ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IO properties.put(ois.readUTF(), ois.readUTF()); } - this.schemaInfo = new SchemaInfoImpl(name, schemaBytes, type, properties); + // Timestamp + long timestamp = ois.readLong(); + + this.schemaInfo = SchemaInfoImpl.builder() + .name(name) + .schema(schemaBytes) + .type(type) + .properties(properties) + .timestamp(timestamp) + .build(); this.schema = createSchema(schemaInfo); } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java index 4b1f7ee47bcbc..fea80a7c6b50c 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java @@ -181,8 +181,13 @@ public static SchemaInfo encodeClassInfo(SchemaInfo schemaInfo, Class typeCla Map properties = new HashMap<>(schemaInfo.getProperties()); properties.put(CLASS_INFO_PLACEHOLDER, typeClass.getName()); - return new SchemaInfoImpl( - schemaInfo.getName(), schemaInfo.getSchema(), schemaInfo.getType(), properties); + return SchemaInfoImpl.builder() + .name(schemaInfo.getName()) + .schema(schemaInfo.getSchema()) + .type(schemaInfo.getType()) + .timestamp(schemaInfo.getTimestamp()) + .properties(properties) + .build(); } @SuppressWarnings("unchecked")