Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -150,41 +149,32 @@ public static String getPEMEncodedString(X509Certificate certificate)
* containing multiple certificates. To get all certificates, use
* {@link #getCertPathFromPemEncodedString(String)}.
*
* @param pemEncodedString - PEM encoded String.
* @param pemEncoded - PEM encoded String.
* @return X509Certificate - Certificate.
* @throws CertificateException - Thrown on Failure.
*/
public static X509Certificate getX509Certificate(String pemEncodedString)
public static X509Certificate getX509Certificate(String pemEncoded)
throws CertificateException {
return getX509Certificate(pemEncodedString, Function.identity());
}

public static <E extends Exception> X509Certificate getX509Certificate(
String pemEncoded, Function<CertificateException, E> convertor)
throws E {
// ByteArrayInputStream.close(), which is a noop, can be safely ignored.
final ByteArrayInputStream input = new ByteArrayInputStream(
pemEncoded.getBytes(DEFAULT_CHARSET));
return readX509Certificate(input, convertor);
return readX509Certificate(input);
}

private static <E extends Exception> X509Certificate readX509Certificate(
InputStream input, Function<CertificateException, E> convertor)
throws E {
try {
return (X509Certificate) getCertFactory().generateCertificate(input);
} catch (CertificateException e) {
throw convertor.apply(e);
public static X509Certificate readX509Certificate(InputStream input) throws CertificateException {
final Certificate cert = getCertFactory().generateCertificate(input);
if (cert instanceof X509Certificate) {
return (X509Certificate) cert;
}
throw new CertificateException("Certificate is not a X509Certificate: " + cert.getClass() + ", " + cert);
}

public static X509Certificate readX509Certificate(InputStream input)
throws IOException {
return readX509Certificate(input, CertificateCodec::toIOException);
}

public static IOException toIOException(CertificateException e) {
return new IOException("Failed to engineGenerateCertificate", e);
public static X509Certificate readX509Certificate(String pemEncoded) throws IOException {
try {
return getX509Certificate(pemEncoded);
} catch (CertificateException e) {
throw new IOException("Failed to getX509Certificate from " + pemEncoded, e);
}
}

public static X509Certificate firstCertificateFrom(CertPath certificatePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ default boolean supportCodecBuffer() {
* @param allocator To allocate a buffer.
* @return a buffer storing the serialized bytes.
*/
default CodecBuffer toCodecBuffer(@Nonnull T object,
CodecBuffer.Allocator allocator) throws IOException {
default CodecBuffer toCodecBuffer(@Nonnull T object, CodecBuffer.Allocator allocator) throws CodecException {
throw new UnsupportedOperationException();
}

Expand All @@ -63,8 +62,7 @@ default CodecBuffer toCodecBuffer(@Nonnull T object,
* @param object The object to be serialized.
* @return a direct buffer storing the serialized bytes.
*/
default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
throws IOException {
default CodecBuffer toDirectCodecBuffer(@Nonnull T object) throws CodecException {
return toCodecBuffer(object, CodecBuffer.Allocator.getDirect());
}

Expand All @@ -74,8 +72,7 @@ default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
* @param object The object to be serialized.
* @return a heap buffer storing the serialized bytes.
*/
default CodecBuffer toHeapCodecBuffer(@Nonnull T object)
throws IOException {
default CodecBuffer toHeapCodecBuffer(@Nonnull T object) throws CodecException {
return toCodecBuffer(object, CodecBuffer.Allocator.getHeap());
}

Expand All @@ -85,7 +82,7 @@ default CodecBuffer toHeapCodecBuffer(@Nonnull T object)
* @param buffer Storing the serialized bytes of an object.
* @return the deserialized object.
*/
default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException {
default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,16 +462,16 @@ CodecBuffer put(ToIntFunction<ByteBuffer> source) {
* @param source put bytes to an {@link OutputStream} and return the size.
* The returned size must be non-null and non-negative.
* @return this object.
* @throws IOException in case the source throws an {@link IOException}.
* @throws CodecException in case the source throws an {@link IOException}.
*/
public CodecBuffer put(
CheckedFunction<OutputStream, Integer, IOException> source)
throws IOException {
public CodecBuffer put(CheckedFunction<OutputStream, Integer, IOException> source) throws CodecException {
assertRefCnt(1);
final int w = buf.writerIndex();
final int size;
try (ByteBufOutputStream out = new ByteBufOutputStream(buf)) {
size = source.apply(out);
} catch (IOException e) {
throw new CodecException("Failed to apply source to " + this + ", " + source, e);
}
final ByteBuf returned = buf.setIndex(buf.readerIndex(), w + size);
Preconditions.assertSame(buf, returned, "buf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;

/**
* Exceptions thrown from the {@link Codec} subclasses.
*/
public class CodecException extends IOException {
public CodecException(String message, Throwable cause) {
super(message, cause);
}

public CodecException(String message) {
super(message);
}

public CodecException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import jakarta.annotation.Nonnull;
import java.io.IOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.function.CheckedFunction;

/**
Expand All @@ -29,10 +30,11 @@
*/
public class DelegatedCodec<T, DELEGATE> implements Codec<T> {
private final Codec<DELEGATE> delegate;
private final CheckedFunction<DELEGATE, T, IOException> forward;
private final CheckedFunction<T, DELEGATE, IOException> backward;
private final CheckedFunction<DELEGATE, T, CodecException> forward;
private final CheckedFunction<T, DELEGATE, CodecException> backward;
private final Class<T> clazz;
private final CopyType copyType;
private final String name;

/**
* Construct a {@link Codec} using the given delegate.
Expand All @@ -43,20 +45,21 @@ public class DelegatedCodec<T, DELEGATE> implements Codec<T> {
* @param copyType How to {@link #copyObject(Object)}?
*/
public DelegatedCodec(Codec<DELEGATE> delegate,
CheckedFunction<DELEGATE, T, IOException> forward,
CheckedFunction<T, DELEGATE, IOException> backward,
CheckedFunction<DELEGATE, T, CodecException> forward,
CheckedFunction<T, DELEGATE, CodecException> backward,
Class<T> clazz, CopyType copyType) {
this.delegate = delegate;
this.forward = forward;
this.backward = backward;
this.clazz = clazz;
this.copyType = copyType;
this.name = JavaUtils.getClassSimpleName(getTypeClass()) + "-delegate: " + delegate;
}

/** The same as new DelegatedCodec(delegate, forward, backward, DEEP). */
public DelegatedCodec(Codec<DELEGATE> delegate,
CheckedFunction<DELEGATE, T, IOException> forward,
CheckedFunction<T, DELEGATE, IOException> backward,
CheckedFunction<DELEGATE, T, CodecException> forward,
CheckedFunction<T, DELEGATE, CodecException> backward,
Class<T> clazz) {
this(delegate, forward, backward, clazz, CopyType.DEEP);
}
Expand All @@ -72,14 +75,12 @@ public final boolean supportCodecBuffer() {
}

@Override
public final CodecBuffer toCodecBuffer(@Nonnull T message,
CodecBuffer.Allocator allocator) throws IOException {
public final CodecBuffer toCodecBuffer(@Nonnull T message, CodecBuffer.Allocator allocator) throws CodecException {
return delegate.toCodecBuffer(backward.apply(message), allocator);
}

@Override
public final T fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws IOException {
public final T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
return forward.apply(delegate.fromCodecBuffer(buffer));
}

Expand Down Expand Up @@ -109,11 +110,16 @@ public T copyObject(T message) {
// Deep copy
try {
return forward.apply(delegate.copyObject(backward.apply(message)));
} catch (IOException e) {
} catch (CodecException e) {
throw new IllegalStateException("Failed to copyObject", e);
}
}

@Override
public String toString() {
return name;
}

/** How to {@link #copyObject(Object)}? */
public enum CopyType {
/** Deep copy -- duplicate the underlying fields of the object. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.ratis.util.function.CheckedFunction;

/**
Expand Down Expand Up @@ -64,24 +65,37 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull M message,
CodecBuffer.Allocator allocator) throws IOException {
CodecBuffer.Allocator allocator) throws CodecException {
final int size = message.getSerializedSize();
return allocator.apply(size).put(writeTo(message, size));
}

private CheckedFunction<OutputStream, Integer, IOException> writeTo(
M message, int size) {
return out -> {
message.writeTo(out);
return size;
return new CheckedFunction<OutputStream, Integer, IOException>() {
@Override
public Integer apply(OutputStream out) throws IOException {
message.writeTo(out);
return size;
}

@Override
public String toString() {
return "source: size=" + size + ", message=" + message;
}
};
}

@Override
public M fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws IOException {
try (InputStream in = buffer.getInputStream()) {
throws CodecException {
final InputStream in = buffer.getInputStream();
try {
return parser.parseFrom(in);
} catch (InvalidProtocolBufferException e) {
throw new CodecException("Failed to parse " + buffer + " for " + getTypeClass(), e);
} finally {
IOUtils.closeQuietly(in);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,31 @@ public boolean supportCodecBuffer() {
return true;
}

private ToIntFunction<ByteBuffer> writeTo(M message, int size) {
return buffer -> {
@Override
public CodecBuffer toCodecBuffer(@Nonnull M message, CodecBuffer.Allocator allocator) {
final int size = message.getSerializedSize();
final CodecBuffer codecBuffer = allocator.apply(size);
final ToIntFunction<ByteBuffer> writeTo = buffer -> {
try {
message.writeTo(CodedOutputStream.newInstance(buffer));
} catch (IOException e) {
// The buffer was allocated with the message size, it should never throw an IOException
throw new IllegalStateException(
"Failed to writeTo: message=" + message, e);
}
return size;
};
codecBuffer.put(writeTo);
return codecBuffer;
}

@Override
public CodecBuffer toCodecBuffer(@Nonnull M message,
CodecBuffer.Allocator allocator) {
final int size = message.getSerializedSize();
return allocator.apply(size).put(writeTo(message, size));
}

@Override
public M fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws InvalidProtocolBufferException {
return parser.parseFrom(buffer.asReadOnlyByteBuffer());
public M fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
try {
return parser.parseFrom(buffer.asReadOnlyByteBuffer());
} catch (InvalidProtocolBufferException e) {
throw new CodecException("Failed to parse " + buffer + " for " + getTypeClass(), e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,11 @@ public boolean supportCodecBuffer() {
}

@Override
public CodecBuffer toCodecBuffer(@Nonnull String object,
CodecBuffer.Allocator allocator) throws IOException {
public CodecBuffer toCodecBuffer(@Nonnull String object, CodecBuffer.Allocator allocator) throws CodecException {
// allocate a larger buffer to avoid encoding twice.
final int upperBound = getSerializedSizeUpperBound(object);
final CodecBuffer buffer = allocator.apply(upperBound);
buffer.putFromSource(encode(object, null, IOException::new));
buffer.putFromSource(encode(object, null, CodecException::new));
return buffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public static List<X509Certificate> convertToX509(
List<X509Certificate> x509Certificates =
new ArrayList<>(pemEncodedCerts.size());
for (String cert : pemEncodedCerts) {
x509Certificates.add(CertificateCodec.getX509Certificate(
cert, CertificateCodec::toIOException));
x509Certificates.add(CertificateCodec.readX509Certificate(cert));
}
return x509Certificates;
}
Expand Down
Loading