Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions flink-table-store-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,21 @@ public class Committable {

private final Kind kind;

private final byte[] wrappedCommittable;
private final Object wrappedCommittable;

private final int serializerVersion;

public Committable(Kind kind, byte[] wrappedCommittable, int serializerVersion) {
public Committable(Kind kind, Object wrappedCommittable) {
this.kind = kind;
this.wrappedCommittable = wrappedCommittable;
this.serializerVersion = serializerVersion;
}

public Kind kind() {
return kind;
}

public byte[] wrappedCommittable() {
public Object wrappedCommittable() {
return wrappedCommittable;
}

public int serializerVersion() {
return serializerVersion;
}

enum Kind {
FILE((byte) 0),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,80 @@

import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;
import java.nio.ByteBuffer;

/** {@link SimpleVersionedSerializer} for {@link Committable}. */
public class CommittableSerializer implements SimpleVersionedSerializer<Committable> {

public static final CommittableSerializer INSTANCE = new CommittableSerializer();
private final FileCommittableSerializer fileCommittableSerializer;

private final SimpleVersionedSerializer<Object> logCommittableSerializer;

public CommittableSerializer(
FileCommittableSerializer fileCommittableSerializer,
SimpleVersionedSerializer<Object> logCommittableSerializer) {
this.fileCommittableSerializer = fileCommittableSerializer;
this.logCommittableSerializer = logCommittableSerializer;
}

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(Committable committable) {
byte[] wrapped = committable.wrappedCommittable();
public byte[] serialize(Committable committable) throws IOException {
byte[] wrapped;
int version;
switch (committable.kind()) {
case FILE:
version = fileCommittableSerializer.getVersion();
wrapped =
fileCommittableSerializer.serialize(
(FileCommittable) committable.wrappedCommittable());
break;
case LOG:
version = logCommittableSerializer.getVersion();
wrapped = logCommittableSerializer.serialize(committable.wrappedCommittable());
break;
case LOG_OFFSET:
version = 1;
wrapped = ((LogOffsetCommittable) committable.wrappedCommittable()).toBytes();
break;
default:
throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
}

return ByteBuffer.allocate(1 + wrapped.length + 4)
.put(committable.kind().toByteValue())
.put(wrapped)
.putInt(committable.serializerVersion())
.putInt(version)
.array();
}

@Override
public Committable deserialize(int i, byte[] bytes) {
public Committable deserialize(int i, byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
byte[] wrapped = new byte[bytes.length - 5];
buffer.get(wrapped);
int version = buffer.getInt();
return new Committable(kind, wrapped, version);

Object wrappedCommittable;
switch (kind) {
case FILE:
wrappedCommittable = fileCommittableSerializer.deserialize(version, wrapped);
break;
case LOG:
wrappedCommittable = logCommittableSerializer.deserialize(version, wrapped);
break;
case LOG_OFFSET:
wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
break;
default:
throw new UnsupportedOperationException("Unsupported kind: " + kind);
}
return new Committable(kind, wrappedCommittable);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector.sink;

import java.nio.ByteBuffer;
import java.util.Objects;

/** Log offset committable for a bucket. */
public class LogOffsetCommittable {
Expand Down Expand Up @@ -48,4 +49,21 @@ public static LogOffsetCommittable fromBytes(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return new LogOffsetCommittable(buffer.getInt(), buffer.getLong());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogOffsetCommittable that = (LogOffsetCommittable) o;
return bucket == that.bucket && offset == that.offset;
}

@Override
public int hashCode() {
return Objects.hash(bucket, offset);
}
}
Loading