-
Notifications
You must be signed in to change notification settings - Fork 735
Expand file tree
/
Copy pathProxyConnection.java
More file actions
839 lines (759 loc) · 26.7 KB
/
ProxyConnection.java
File metadata and controls
839 lines (759 loc) · 26.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
package org.littleshoot.proxy.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import org.littleshoot.proxy.HttpFilters;
import javax.net.ssl.SSLEngine;
import static org.littleshoot.proxy.impl.ConnectionState.*;
/**
* <p>
* Base class for objects that represent a connection to/from our proxy.
* </p>
* <p>
* A ProxyConnection models a bidirectional message flow on top of a Netty
* {@link Channel}.
* </p>
* <p>
* The {@link #read(Object)} method is called whenever a new message arrives on
* the underlying socket.
* </p>
* <p>
* The {@link #write(Object)} method can be called by anyone wanting to write
* data out of the connection.
* </p>
* <p>
* ProxyConnection has a lifecycle and its current state within that lifecycle
* is recorded as a {@link ConnectionState}. The allowed states and transitions
* vary a little depending on the concrete implementation of ProxyConnection.
* However, all ProxyConnections share the following lifecycle events:
* </p>
*
* <ul>
* <li>{@link #connected()} - Once the underlying channel is active, the
* ProxyConnection is considered connected and moves into
* {@link ConnectionState#AWAITING_INITIAL}. The Channel is recorded at this
* time for later referencing.</li>
* <li>{@link #disconnected()} - When the underlying channel goes inactive, the
* ProxyConnection moves into {@link ConnectionState#DISCONNECTED}</li>
* <li>{@link #becameWritable()} - When the underlying channel becomes
* writeable, this callback is invoked.</li>
* </ul>
*
* <p>
* By default, incoming data on the underlying channel is automatically read and
* passed to the {@link #read(Object)} method. Reading can be stopped and
* resumed using {@link #stopReading()} and {@link #resumeReading()}.
* </p>
*
* @param <I>
* the type of "initial" message. This will be either
* {@link HttpResponse} or {@link HttpRequest}.
*/
abstract class ProxyConnection<I extends HttpObject> extends
SimpleChannelInboundHandler<Object> {
protected final ProxyConnectionLogger LOG = new ProxyConnectionLogger(this);
protected final DefaultHttpProxyServer proxyServer;
protected final boolean runsAsSslClient;
protected volatile ChannelHandlerContext ctx;
protected volatile Channel channel;
private volatile ConnectionState currentState;
private volatile boolean tunneling = false;
protected volatile long lastReadTime = 0;
/**
* If using encryption, this holds our {@link SSLEngine}.
*/
protected volatile SSLEngine sslEngine;
/**
* Construct a new ProxyConnection.
*
* @param initialState
* the state in which this connection starts out
* @param proxyServer
* the {@link DefaultHttpProxyServer} in which we're running
* @param runsAsSslClient
* determines whether this connection acts as an SSL client or
* server (determines who does the handshake)
*/
protected ProxyConnection(ConnectionState initialState,
DefaultHttpProxyServer proxyServer,
boolean runsAsSslClient) {
become(initialState);
this.proxyServer = proxyServer;
this.runsAsSslClient = runsAsSslClient;
}
/***************************************************************************
* Reading
**************************************************************************/
/**
* Read is invoked automatically by Netty as messages arrive on the socket.
*
* @param msg
*/
protected void read(Object msg) {
LOG.debug("Reading: {}", msg);
lastReadTime = System.currentTimeMillis();
if (tunneling) {
// In tunneling mode, this connection is simply shoveling bytes
readRaw((ByteBuf) msg);
} else {
// If not tunneling, then we are always dealing with HttpObjects.
readHTTP((HttpObject) msg);
}
}
/**
* Handles reading {@link HttpObject}s.
*
* @param httpObject
*/
@SuppressWarnings("unchecked")
private void readHTTP(HttpObject httpObject) {
ConnectionState nextState = getCurrentState();
switch (getCurrentState()) {
case AWAITING_INITIAL:
if (httpObject instanceof HttpMessage) {
nextState = readHTTPInitial((I) httpObject);
} else {
// Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL
// state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit
// response from a filter. The client may have sent some chunked HttpContent associated with the request
// after the short-circuit response was sent. We can safely drop them.
LOG.debug("Dropping message because HTTP object was not an HttpMessage. HTTP object may be orphaned content from a short-circuited response. Message: {}", httpObject);
}
break;
case AWAITING_CHUNK:
HttpContent chunk = (HttpContent) httpObject;
readHTTPChunk(chunk);
nextState = ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
: AWAITING_CHUNK;
break;
case AWAITING_PROXY_AUTHENTICATION:
if (httpObject instanceof HttpRequest) {
// Once we get an HttpRequest, try to process it as usual
nextState = readHTTPInitial((I) httpObject);
} else {
// Anything that's not an HttpRequest that came in while
// we're pending authentication gets dropped on the floor. This
// can happen if the connected host already sent us some chunks
// (e.g. from a POST) after an initial request that turned out
// to require authentication.
}
break;
case CONNECTING:
LOG.warn("Attempted to read from connection that's in the process of connecting. This shouldn't happen.");
break;
case NEGOTIATING_CONNECT:
LOG.debug("Attempted to read from connection that's in the process of negotiating an HTTP CONNECT. This is probably the LastHttpContent of a chunked CONNECT.");
break;
case AWAITING_CONNECT_OK:
LOG.warn("AWAITING_CONNECT_OK should have been handled by ProxyToServerConnection.read()");
break;
case HANDSHAKING:
LOG.warn(
"Attempted to read from connection that's in the process of handshaking. This shouldn't happen.",
channel);
break;
case DISCONNECT_REQUESTED:
case DISCONNECTED:
LOG.info("Ignoring message since the connection is closed or about to close");
break;
}
become(nextState);
}
/**
* Implement this to handle reading the initial object (e.g.
* {@link HttpRequest} or {@link HttpResponse}).
*
* @param httpObject
* @return
*/
protected abstract ConnectionState readHTTPInitial(I httpObject);
/**
* Implement this to handle reading a chunk in a chunked transfer.
*
* @param chunk
*/
protected abstract void readHTTPChunk(HttpContent chunk);
/**
* Implement this to handle reading a raw buffer as they are used in HTTP
* tunneling.
*
* @param buf
*/
protected abstract void readRaw(ByteBuf buf);
/***************************************************************************
* Writing
**************************************************************************/
/**
* This method is called by users of the ProxyConnection to send stuff out
* over the socket.
*
* @param msg
*/
void write(Object msg) {
if (msg instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) msg).retain();
}
doWrite(msg);
}
void doWrite(Object msg) {
LOG.debug("Writing: {}", msg);
try {
if (msg instanceof HttpObject) {
writeHttp((HttpObject) msg);
} else {
writeRaw((ByteBuf) msg);
}
} finally {
LOG.debug("Wrote: {}", msg);
}
}
/**
* Writes HttpObjects to the connection asynchronously.
*
* @param httpObject
*/
protected void writeHttp(HttpObject httpObject) {
if (ProxyUtils.isLastChunk(httpObject)) {
channel.write(httpObject);
LOG.debug("Writing an empty buffer to signal the end of our chunked transfer");
writeToChannel(Unpooled.EMPTY_BUFFER);
} else {
writeToChannel(httpObject);
}
}
/**
* Writes raw buffers to the connection.
*
* @param buf
*/
protected void writeRaw(ByteBuf buf) {
writeToChannel(buf);
}
protected ChannelFuture writeToChannel(final Object msg) {
return channel.writeAndFlush(msg);
}
/***************************************************************************
* Lifecycle
**************************************************************************/
/**
* This method is called as soon as the underlying {@link Channel} is
* connected. Note that for proxies with complex {@link ConnectionFlow}s
* that include SSL handshaking and other such things, just because the
* {@link Channel} is connected doesn't mean that our connection is fully
* established.
*/
protected void connected() {
LOG.debug("Connected");
}
/**
* This method is called as soon as the underlying {@link Channel} becomes
* disconnected.
*/
protected void disconnected() {
become(DISCONNECTED);
LOG.debug("Disconnected");
}
/**
* This method is called when the underlying {@link Channel} times out due
* to an idle timeout.
*/
protected void timedOut() {
disconnect();
}
/**
* <p>
* Enables tunneling on this connection by dropping the HTTP related
* encoders and decoders, as well as idle timers.
* </p>
*
* <p>
* Note - the work is done on the {@link ChannelHandlerContext}'s executor
* because {@link ChannelPipeline#remove(String)} can deadlock if called
* directly.
* </p>
*/
protected ConnectionFlowStep StartTunneling = new ConnectionFlowStep(
this, NEGOTIATING_CONNECT) {
@Override
boolean shouldSuppressInitialRequest() {
return true;
}
protected Future execute() {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.get("encoder") != null) {
pipeline.remove("encoder");
}
if (pipeline.get("responseWrittenMonitor") != null) {
pipeline.remove("responseWrittenMonitor");
}
if (pipeline.get("decoder") != null) {
pipeline.remove("decoder");
}
if (pipeline.get("requestReadMonitor") != null) {
pipeline.remove("requestReadMonitor");
}
tunneling = true;
return channel.newSucceededFuture();
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
};
/**
* Encrypts traffic on this connection with SSL/TLS.
*
* @param sslEngine
* the {@link SSLEngine} for doing the encryption
* @param authenticateClients
* determines whether to authenticate clients or not
* @return a Future for when the SSL handshake has completed
*/
protected Future<Channel> encrypt(SSLEngine sslEngine,
boolean authenticateClients) {
return encrypt(ctx.pipeline(), sslEngine, authenticateClients);
}
/**
* Encrypts traffic on this connection with SSL/TLS.
*
* @param pipeline
* the ChannelPipeline on which to enable encryption
* @param sslEngine
* the {@link SSLEngine} for doing the encryption
* @param authenticateClients
* determines whether to authenticate clients or not
* @return a Future for when the SSL handshake has completed
*/
protected Future<Channel> encrypt(ChannelPipeline pipeline,
SSLEngine sslEngine,
boolean authenticateClients) {
LOG.debug("Enabling encryption with SSLEngine: {}",
sslEngine);
this.sslEngine = sslEngine;
sslEngine.setUseClientMode(runsAsSslClient);
sslEngine.setNeedClientAuth(authenticateClients);
if (null != channel) {
channel.config().setAutoRead(true);
}
SslHandler handler = new SslHandler(sslEngine);
if(pipeline.get("ssl") == null) {
pipeline.addFirst("ssl", handler);
} else {
// The second SSL handler is added to handle the case
// where the proxy (running as MITM) has to chain with
// another SSL enabled proxy. The second SSL handler
// is to perform SSL with the server.
pipeline.addAfter("ssl", "sslWithServer", handler);
}
return handler.handshakeFuture();
}
/**
* Encrypts the channel using the provided {@link SSLEngine}.
*
* @param sslEngine
* the {@link SSLEngine} for doing the encryption
*/
protected ConnectionFlowStep EncryptChannel(
final SSLEngine sslEngine) {
return new ConnectionFlowStep(this, HANDSHAKING) {
@Override
boolean shouldExecuteOnEventLoop() {
return false;
}
@Override
protected Future<?> execute() {
return encrypt(sslEngine, !runsAsSslClient);
}
};
};
/**
* Enables decompression and aggregation of content, which is useful for
* certain types of filtering activity.
*
* @param pipeline
* @param numberOfBytesToBuffer
*/
protected void aggregateContentForFiltering(ChannelPipeline pipeline,
int numberOfBytesToBuffer) {
pipeline.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast("aggregator", new HttpObjectAggregator(
numberOfBytesToBuffer));
}
/**
* Callback that's invoked if this connection becomes saturated.
*/
protected void becameSaturated() {
LOG.debug("Became saturated");
}
/**
* Callback that's invoked when this connection becomes writeable again.
*/
protected void becameWritable() {
LOG.debug("Became writeable");
}
/**
* Override this to handle exceptions that occurred during asynchronous
* processing on the {@link Channel}.
*
* @param cause
*/
protected void exceptionCaught(Throwable cause) {
}
/***************************************************************************
* State/Management
**************************************************************************/
/**
* Disconnects. This will wait for pending writes to be flushed before
* disconnecting.
*
* @return Future<Void> for when we're done disconnecting. If we weren't
* connected, this returns null.
*/
Future<Void> disconnect() {
if (channel == null) {
return null;
} else {
final Promise<Void> promise = channel.newPromise();
writeToChannel(Unpooled.EMPTY_BUFFER).addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(
Future<? super Void> future)
throws Exception {
closeChannel(promise);
}
});
return promise;
}
}
private void closeChannel(final Promise<Void> promise) {
channel.close().addListener(
new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(
Future<? super Void> future)
throws Exception {
if (future
.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future
.cause());
}
};
});
}
/**
* Indicates whether or not this connection is saturated (i.e. not
* writeable).
*
* @return
*/
protected boolean isSaturated() {
return !this.channel.isWritable();
}
/**
* Utility for checking current state.
*
* @param state
* @return
*/
protected boolean is(ConnectionState state) {
return currentState == state;
}
/**
* If this connection is currently in the process of going through a
* {@link ConnectionFlow}, this will return true.
*
* @return
*/
protected boolean isConnecting() {
return currentState.isPartOfConnectionFlow();
}
/**
* Udpates the current state to the given value.
*
* @param state
*/
protected void become(ConnectionState state) {
this.currentState = state;
}
protected ConnectionState getCurrentState() {
return currentState;
}
public boolean isTunneling() {
return tunneling;
}
public SSLEngine getSslEngine() {
return sslEngine;
}
/**
* Call this to stop reading.
*/
protected void stopReading() {
LOG.debug("Stopped reading");
this.channel.config().setAutoRead(false);
}
/**
* Call this to resume reading.
*/
protected void resumeReading() {
LOG.debug("Resumed reading");
this.channel.config().setAutoRead(true);
}
/**
* Request the ProxyServer for Filters.
*
* By default, no-op filters are returned by DefaultHttpProxyServer.
* Subclasses of ProxyConnection can change this behaviour.
*
* @param httpRequest
* Filter attached to the give HttpRequest (if any)
* @return
*/
protected HttpFilters getHttpFiltersFromProxyServer(HttpRequest httpRequest) {
return proxyServer.getFiltersSource().filterRequest(httpRequest, ctx);
}
ProxyConnectionLogger getLOG() {
return LOG;
}
/***************************************************************************
* Adapting the Netty API
**************************************************************************/
@Override
protected final void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
read(msg);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
try {
this.ctx = ctx;
this.channel = ctx.channel();
this.proxyServer.registerChannel(ctx.channel());
} finally {
super.channelRegistered(ctx);
}
}
/**
* Only once the Netty Channel is active to we recognize the ProxyConnection
* as connected.
*/
@Override
public final void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
connected();
} finally {
super.channelActive(ctx);
}
}
/**
* As soon as the Netty Channel is inactive, we recognize the
* ProxyConnection as disconnected.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
disconnected();
} finally {
super.channelInactive(ctx);
}
}
@Override
public final void channelWritabilityChanged(ChannelHandlerContext ctx)
throws Exception {
LOG.debug("Writability changed. Is writable: {}", channel.isWritable());
try {
if (this.channel.isWritable()) {
becameWritable();
} else {
becameSaturated();
}
} finally {
super.channelWritabilityChanged(ctx);
}
}
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
exceptionCaught(cause);
}
/**
* <p>
* We're looking for {@link IdleStateEvent}s to see if we need to
* disconnect.
* </p>
*
* <p>
* Note - we don't care what kind of IdleState we got. Thanks to <a
* href="https://github.com/qbast">qbast</a> for pointing this out.
* </p>
*/
@Override
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
try {
if (evt instanceof IdleStateEvent) {
LOG.debug("Got idle");
timedOut();
}
} finally {
super.userEventTriggered(ctx, evt);
}
}
/***************************************************************************
* Activity Tracking/Statistics
**************************************************************************/
/**
* Utility handler for monitoring bytes read on this connection.
*/
@Sharable
protected abstract class BytesReadMonitor extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
if (msg instanceof ByteBuf) {
bytesRead(((ByteBuf) msg).readableBytes());
}
} catch (Throwable t) {
LOG.warn("Unable to record bytesRead", t);
} finally {
super.channelRead(ctx, msg);
}
}
protected abstract void bytesRead(int numberOfBytes);
}
/**
* Utility handler for monitoring requests read on this connection.
*/
@Sharable
protected abstract class RequestReadMonitor extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
if (msg instanceof HttpRequest) {
requestRead((HttpRequest) msg);
}
} catch (Throwable t) {
LOG.warn("Unable to record bytesRead", t);
} finally {
super.channelRead(ctx, msg);
}
}
protected abstract void requestRead(HttpRequest httpRequest);
}
/**
* Utility handler for monitoring responses read on this connection.
*/
@Sharable
protected abstract class ResponseReadMonitor extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
if (msg instanceof HttpResponse) {
responseRead((HttpResponse) msg);
}
} catch (Throwable t) {
LOG.warn("Unable to record bytesRead", t);
} finally {
super.channelRead(ctx, msg);
}
}
protected abstract void responseRead(HttpResponse httpResponse);
}
/**
* Utility handler for monitoring bytes written on this connection.
*/
@Sharable
protected abstract class BytesWrittenMonitor extends
ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise)
throws Exception {
try {
if (msg instanceof ByteBuf) {
bytesWritten(((ByteBuf) msg).readableBytes());
}
} catch (Throwable t) {
LOG.warn("Unable to record bytesRead", t);
} finally {
super.write(ctx, msg, promise);
}
}
protected abstract void bytesWritten(int numberOfBytes);
}
/**
* Utility handler for monitoring requests written on this connection.
*/
@Sharable
protected abstract class RequestWrittenMonitor extends
ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise)
throws Exception {
HttpRequest originalRequest = null;
if (msg instanceof HttpRequest) {
originalRequest = (HttpRequest) msg;
}
if (null != originalRequest) {
requestWriting(originalRequest);
}
super.write(ctx, msg, promise);
if (null != originalRequest) {
requestWritten(originalRequest);
}
if (msg instanceof HttpContent) {
contentWritten((HttpContent) msg);
}
}
/**
* Invoked immediately before an HttpRequest is written.
*/
protected abstract void requestWriting(HttpRequest httpRequest);
/**
* Invoked immediately after an HttpRequest has been sent.
*/
protected abstract void requestWritten(HttpRequest httpRequest);
/**
* Invoked immediately after an HttpContent has been sent.
*/
protected abstract void contentWritten(HttpContent httpContent);
}
/**
* Utility handler for monitoring responses written on this connection.
*/
@Sharable
protected abstract class ResponseWrittenMonitor extends
ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise)
throws Exception {
try {
if (msg instanceof HttpResponse) {
responseWritten(((HttpResponse) msg));
}
} catch (Throwable t) {
LOG.warn("Error while invoking responseWritten callback", t);
} finally {
super.write(ctx, msg, promise);
}
}
protected abstract void responseWritten(HttpResponse httpResponse);
}
}