Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())

this.apmTracingEnabled = configProvider.getBoolean(GeneralConfig.APM_TRACING_ENABLED, true);

this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, false);
this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, true);

log.debug("New instance: {}", this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ final class TunnelingJdkSocket extends Socket {
private InetSocketAddress inetSocketAddress;

private SocketChannel unixSocketChannel;
private Selector selector;

private int timeout;
private boolean shutIn;
Expand Down Expand Up @@ -90,6 +91,9 @@ public synchronized int getSoTimeout() throws SocketException {

@Override
public void connect(final SocketAddress endpoint) throws IOException {
if (endpoint == null) {
throw new IllegalArgumentException("Endpoint cannot be null");
}
if (isClosed()) {
throw new SocketException("Socket is closed");
}
Expand All @@ -105,6 +109,12 @@ public void connect(final SocketAddress endpoint) throws IOException {
// https://github.com/jnr/jnr-unixsocket/blob/master/src/main/java/jnr/unixsocket/UnixSocket.java#L89-L97
@Override
public void connect(final SocketAddress endpoint, final int timeout) throws IOException {
if (endpoint == null) {
throw new IllegalArgumentException("Endpoint cannot be null");
}
if (timeout < 0) {
throw new IllegalArgumentException("Timeout cannot be negative");
}
if (isClosed()) {
throw new SocketException("Socket is closed");
}
Expand All @@ -122,17 +132,19 @@ public SocketChannel getChannel() {

@Override
public void setSendBufferSize(int size) throws SocketException {
if (size <= 0) {
throw new IllegalArgumentException("Invalid send buffer size");
}
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (size < 0) {
throw new IllegalArgumentException("Invalid send buffer size");
}
sendBufferSize = size;
try {
unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, size);
sendBufferSize = size;
} catch (IOException e) {
throw new SocketException("Failed to set send buffer size");
SocketException se = new SocketException("Failed to set send buffer size socket option");
se.initCause(e);
throw se;
}
}

Expand All @@ -149,17 +161,19 @@ public int getSendBufferSize() throws SocketException {

@Override
public void setReceiveBufferSize(int size) throws SocketException {
if (size <= 0) {
throw new IllegalArgumentException("Invalid receive buffer size");
}
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (size < 0) {
throw new IllegalArgumentException("Invalid receive buffer size");
}
receiveBufferSize = size;
try {
unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, size);
receiveBufferSize = size;
} catch (IOException e) {
throw new SocketException("Failed to set receive buffer size");
SocketException se = new SocketException("Failed to set receive buffer size socket option");
se.initCause(e);
throw se;
}
}

Expand Down Expand Up @@ -196,14 +210,14 @@ public InputStream getInputStream() throws IOException {
throw new SocketException("Socket input is shutdown");
}

if (selector == null) {
selector = Selector.open();
unixSocketChannel.configureBlocking(false);
unixSocketChannel.register(selector, SelectionKey.OP_READ);
}

return new InputStream() {
private final ByteBuffer buffer = ByteBuffer.allocate(getStreamBufferSize());
private final Selector selector = Selector.open();

{
unixSocketChannel.configureBlocking(false);
unixSocketChannel.register(selector, SelectionKey.OP_READ);
}

@Override
public int read() throws IOException {
Expand All @@ -213,6 +227,9 @@ public int read() throws IOException {

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (isInputShutdown()) {
return -1;
}
buffer.clear();

int readyChannels = selector.select(timeout);
Expand Down Expand Up @@ -241,7 +258,7 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void close() throws IOException {
selector.close();
TunnelingJdkSocket.this.close();
}
};
}
Expand All @@ -254,7 +271,7 @@ public OutputStream getOutputStream() throws IOException {
if (!isConnected()) {
throw new SocketException("Socket is not connected");
}
if (isInputShutdown()) {
if (isOutputShutdown()) {
throw new SocketException("Socket output is shutdown");
}

Expand All @@ -267,12 +284,19 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (isOutputShutdown()) {
throw new IOException("Stream closed");
}
ByteBuffer buffer = ByteBuffer.wrap(b, off, len);

while (buffer.hasRemaining()) {
unixSocketChannel.write(buffer);
}
}

@Override
public void close() throws IOException {
TunnelingJdkSocket.this.close();
}
};
}

Expand Down Expand Up @@ -308,6 +332,9 @@ public void shutdownOutput() throws IOException {

@Override
public InetAddress getInetAddress() {
if (!isConnected()) {
return null;
}
return inetSocketAddress.getAddress();
}

Expand All @@ -316,8 +343,31 @@ public void close() throws IOException {
if (isClosed()) {
return;
}
if (null != unixSocketChannel) {
unixSocketChannel.close();
// Ignore possible exceptions so that we continue closing the socket
try {
if (!isInputShutdown()) {
shutdownInput();
}
} catch (IOException e) {
}
try {
if (!isOutputShutdown()) {
shutdownOutput();
}
} catch (IOException e) {
}
try {
if (selector != null) {
selector.close();
selector = null;
}
} catch (IOException e) {
}
try {
if (unixSocketChannel != null) {
unixSocketChannel.close();
}
} catch (IOException e) {
}
closed = true;
}
Expand Down
Loading