3636import java .net .Inet6Address ;
3737import java .net .InetSocketAddress ;
3838import java .util .ArrayDeque ;
39+ import java .util .Iterator ;
3940import java .util .Queue ;
4041import java .util .concurrent .TimeUnit ;
4142
@@ -183,6 +184,13 @@ private void initHeapWeights() {
183184
184185 @ Override
185186 public void write (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) {
187+ if (!this .channel .parent ().eventLoop ().inEventLoop ()) {
188+ // Make sure this runs on correct thread
189+ log .error ("Tried to write packet from wrong thread: {}" , Thread .currentThread ().getName (), new Throwable ());
190+ final Object finalMsg = msg ;
191+ this .channel .parent ().eventLoop ().execute (() -> this .write (ctx , finalMsg , promise ));
192+ return ;
193+ }
186194 if (msg instanceof ByteBuf ) {
187195 msg = new RakMessage ((ByteBuf ) msg );
188196 } else if (!(msg instanceof RakMessage )) {
@@ -521,7 +529,7 @@ private void onIncomingNack(ChannelHandlerContext ctx, RakDatagramPacket datagra
521529 }
522530
523531 this .slidingWindow .onNak (); // TODO: verify this
524- this .sendDatagram (ctx , datagram , curTime );
532+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
525533 }
526534
527535 private int sendStaleDatagrams (ChannelHandlerContext ctx , long curTime ) {
@@ -533,7 +541,10 @@ private int sendStaleDatagrams(ChannelHandlerContext ctx, long curTime) {
533541 int resendCount = 0 ;
534542 int transmissionBandwidth = this .slidingWindow .getRetransmissionBandwidth ();
535543
536- for (RakDatagramPacket datagram : this .sentDatagrams .values ()) {
544+ IntObjectMap <RakDatagramPacket > sent = new IntObjectHashMap <>();
545+ Iterator <RakDatagramPacket > iterator = this .sentDatagrams .values ().iterator ();
546+ while (iterator .hasNext ()) {
547+ RakDatagramPacket datagram = iterator .next ();
537548 if (datagram .getNextSend () <= curTime ) {
538549 int size = datagram .getSize ();
539550 if (transmissionBandwidth < size ) {
@@ -548,9 +559,13 @@ private int sendStaleDatagrams(ChannelHandlerContext ctx, long curTime) {
548559 log .trace ("Stale datagram {} from {}" , datagram .getSequenceIndex (), this .getRemoteAddress ());
549560 }
550561 resendCount ++;
551- this .sendDatagram (ctx , datagram , curTime );
562+ iterator .remove ();
563+ this .sendDatagram (ctx , datagram , curTime , sent );
552564 }
553565 }
566+ for (IntObjectMap .PrimitiveEntry <RakDatagramPacket > entry : sent .entries ()) {
567+ this .sentDatagrams .put (entry .key (), entry .value ());
568+ }
554569
555570 if (hasResent ) {
556571 this .slidingWindow .onResend (curTime );
@@ -580,7 +595,7 @@ private void sendDatagrams(ChannelHandlerContext ctx, long curTime, int mtuSize)
580595
581596 // Send full datagram
582597 if (!datagram .tryAddPacket (packet , mtuSize )) {
583- this .sendDatagram (ctx , datagram , curTime );
598+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
584599
585600 datagram = RakDatagramPacket .newInstance ();
586601 datagram .setSendTime (curTime );
@@ -591,7 +606,7 @@ private void sendDatagrams(ChannelHandlerContext ctx, long curTime, int mtuSize)
591606 }
592607
593608 if (!datagram .getPackets ().isEmpty ()) {
594- this .sendDatagram (ctx , datagram , curTime );
609+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
595610 }
596611 }
597612
@@ -603,19 +618,12 @@ private void sendImmediate(ChannelHandlerContext ctx, EncapsulatedPacket[] packe
603618 if (!datagram .tryAddPacket (packet , this .getMtu ())) {
604619 throw new IllegalArgumentException ("Packet too large to fit in MTU (size: " + packet .getSize () + ", MTU: " + this .getMtu () + ")" );
605620 }
606- this .sendDatagram (ctx , datagram , curTime );
621+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
607622 }
608623 ctx .flush ();
609624 }
610625
611- private void sendDatagram (ChannelHandlerContext ctx , RakDatagramPacket datagram , long time ) {
612- if (!this .channel .parent ().eventLoop ().inEventLoop ()) {
613- // Make sure this runs on correct thread
614- log .error ("Tried to send datagrams from wrong thread: {}" , Thread .currentThread ().getName (), new Throwable ());
615- this .channel .parent ().eventLoop ().execute (() -> this .sendDatagram (ctx , datagram , time ));
616- return ;
617- }
618-
626+ private void sendDatagram (ChannelHandlerContext ctx , RakDatagramPacket datagram , long time , IntObjectMap <RakDatagramPacket > sent ) {
619627 if (datagram .getPackets ().isEmpty ()) {
620628 throw new IllegalArgumentException ("RakNetDatagram with no packets" );
621629 }
@@ -634,10 +642,8 @@ private void sendDatagram(ChannelHandlerContext ctx, RakDatagramPacket datagram,
634642 datagram .setNextSend (time + this .slidingWindow .getRtoForRetransmission ());
635643 if (oldIndex == -1 ) {
636644 this .slidingWindow .onReliableSend (datagram );
637- } else {
638- this .sentDatagrams .remove (oldIndex , datagram );
639645 }
640- this . sentDatagrams .put (datagram .getSequenceIndex (), datagram .retain ()); // Keep for resending
646+ sent .put (datagram .getSequenceIndex (), datagram .retain ()); // Keep for resending
641647 break ;
642648 }
643649 }
0 commit comments