From 0dd3bca1b801dcca0e06b0c5e9cc375d9f2338bc Mon Sep 17 00:00:00 2001 From: Oknet Xu Date: Sun, 28 Jan 2018 17:55:16 +0800 Subject: [PATCH 1/6] Add QUICPollCont --- iocore/net/I_UDPPacket.h | 6 ++ iocore/net/P_QUICNet.h | 60 ++++++++++++++ iocore/net/P_QUICNetProcessor.h | 2 + iocore/net/P_QUICNetVConnection.h | 2 + iocore/net/P_UnixNet.h | 24 +++++- iocore/net/QUICNet.cc | 125 ++++++++++++++++++++++++++++++ iocore/net/QUICNetVConnection.cc | 50 +++++++++++- iocore/net/QUICPacketHandler.cc | 23 +++--- 8 files changed, 274 insertions(+), 18 deletions(-) create mode 100644 iocore/net/P_QUICNet.h create mode 100644 iocore/net/QUICNet.cc diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index df899ff389d..271a2502e1a 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -62,6 +62,12 @@ class UDPPacket IpEndpoint to; // what address to send to int from_size; + typedef union udppacket_data { + void *ptr; + uint32_t u32; + uint64_t u64; + } udppacket_data_t; + udppacket_data_t data; LINK(UDPPacket, link); }; diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h new file mode 100644 index 00000000000..fba0e96694d --- /dev/null +++ b/iocore/net/P_QUICNet.h @@ -0,0 +1,60 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef __P_QUICNET_H__ +#define __P_QUICNET_H__ + +#include + +#include "ts/ink_platform.h" + +#include "P_Net.h" +class NetHandler; +typedef int (NetHandler::*NetContHandler)(int, void *); + +struct QUICPollCont : public Continuation { + NetHandler *net_handler; + PollDescriptor *pollDescriptor; + + QUICPollCont(Ptr &m); + QUICPollCont(Ptr &m, NetHandler *nh); + ~QUICPollCont(); + int pollEvent(int, Event *); + +public: + // Atomic Queue to save incoming packets + ASLL(UDPPacketInternal, alink) inQueue; + + // Internal Queue to save Long Header Packet + Que(UDPPacket, link) longInQueue; + // Internal Queue to save Short Header Packet + Que(UDPPacket, link) shortInQueue; +}; + +static inline QUICPollCont * +get_QUICPollCont(EThread *t) +{ + return (QUICPollCont *)ETHREAD_GET_PTR(t, quic_NetProcessor.quicPollCont_offset); +} + +#endif diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h index 29720609a00..2b7eb10f48c 100644 --- a/iocore/net/P_QUICNetProcessor.h +++ b/iocore/net/P_QUICNetProcessor.h @@ -67,6 +67,8 @@ class QUICNetProcessor : public UnixNetProcessor Action *main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt) override; + off_t quicPollCont_offset; + private: QUICNetProcessor(const QUICNetProcessor &); QUICNetProcessor &operator=(const QUICNetProcessor &); diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h index a82c37bd297..4e999d95d8c 100644 --- a/iocore/net/P_QUICNetVConnection.h +++ b/iocore/net/P_QUICNetVConnection.h @@ -302,4 +302,6 @@ class QUICNetVConnection : public UnixNetVConnection, public QUICConnection QUICStatelessResetToken _reset_token; }; +typedef int (QUICNetVConnection::*QUICNetVConnHandler)(int, void *); + extern ClassAllocator quicNetVCAllocator; diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 6c69bd35957..6b9ad5f72c5 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -575,8 +575,15 @@ EventIO::start(EventLoop l, NetAccept *vc, int events) TS_INLINE int EventIO::start(EventLoop l, UnixNetVConnection *vc, int events) { + int r; type = EVENTIO_READWRITE_VC; - return start(l, vc->con.fd, (Continuation *)vc, events); + r = start(l, vc->con.fd, (Continuation *)vc, events); + if (r < 0 && vc->options.ip_proto == NetVCOptions::USE_UDP) { + // Hack for QUICNetVC + return 0; + } else { + return r; + } } TS_INLINE int EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) @@ -611,6 +618,12 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e) data.c = c; fd = afd; event_loop = l; + // Hack for QUICNetVC: + // quicnetvc->con.fd == NO_FD + // quicnetvc->options.ip_proto == NetVCOptions::USE_UDP + if (afd == NO_FD) { + return -1; + } #if TS_USE_EPOLL struct epoll_event ev; memset(&ev, 0, sizeof(ev)); @@ -643,6 +656,9 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e) TS_INLINE int EventIO::modify(int e) { + if (fd == NO_FD) { + return 0; + } ink_assert(event_loop); #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) struct epoll_event ev; @@ -722,6 +738,9 @@ EventIO::modify(int e) TS_INLINE int EventIO::refresh(int e) { + if (fd == NO_FD) { + return 0; + } ink_assert(event_loop); #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER) e = e & events; @@ -763,6 +782,9 @@ EventIO::refresh(int e) TS_INLINE int EventIO::stop() { + if (fd == NO_FD) { + return 0; + } if (event_loop) { int retval = 0; #if TS_USE_EPOLL diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc new file mode 100644 index 00000000000..c40121fb70d --- /dev/null +++ b/iocore/net/QUICNet.cc @@ -0,0 +1,125 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_Net.h" + +QUICPollCont::QUICPollCont(Ptr &m) + : Continuation(m.get()), net_handler(nullptr) +{ + SET_HANDLER(&PollCont::pollEvent); +} + +QUICPollCont::QUICPollCont(Ptr &m, NetHandler *nh) + : Continuation(m.get()), net_handler(nh) +{ + SET_HANDLER(&QUICPollCont::pollEvent); +} + +QUICPollCont::~QUICPollCont() +{ +} + +// +// QUICPollCont continuation which traverse the inQueue(ASLL) +// and create new QUICNetVC for Initial Packet, +// and push the triggered QUICNetVC into enable list. +// +int +QUICPollCont::pollEvent(int, Event *) +{ + UnixUDPConnection *uc; + QUICPacketHandler *ph; + QUICNetVConnection *vc; + QUICConnectionId cid; + uint8_t *buf; + uint8_t ptype; + UDPPacket *packet_r; + UDPPacketInternal *p = nullptr; + NetHandler *nh = get_NetHandler(t); + + // Process the ASLL + SList(UDPPacketInternal, alink) aq(inQueue.popall()); + Queue result; + while ((p = aq.pop())) { + result.push(p); + } + + while ((p = result.pop())) { + uc = static_cast(p->getConnection()); + ph = static_cast(uc->continuation); + vc = static_cast(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + cid = QUICPacket::connection_id(buf) + if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value. + ptype = buf[0] & 0x7f; + if (ptype == QUICPacketType::INITIAL) { // Initial Packet + vc->read.triggered = 1; + vc->push_packet(p); + // reschedule the vc and callback vc->acceptEvent + this_ethread()->schedule_imm(vc); + } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet + // TODO: + } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet + if (vc) { + vc->read.triggered = 1; + vc->push_packet(p); + } else { + longInQueue.push(p); + } + } else { + ink_assert(!"not reached!"); + } + } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value. + if (vc) { + vc->read.triggered = 1; + vc->push_packet(p); + } else { + shortInQueue.push(p); + } + } else { + ink_assert(!"not reached!"); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } + } + + return EVENT_CONT; +} + +void +initialize_thread_for_quic_net(EThread *thread) +{ + NetHandler *nh = get_NetHandler(thread); + QUICPollCont *quicpc = get_QUICPollCont(thread); + + new ((ink_dummy_for_new *)quicpc) QUICPollCont(thread->mutex, nh); + + thread->schedule_every(quicpc, -9); +} + diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index 3967513b74c..b0088c851f1 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -63,7 +63,7 @@ void QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con, QUICPacketHandler *packet_handler, QUICConnectionTable *ctable) { - SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); + SET_HANDLER((NetVConnHandler)&QUICNetVConnection::acceptEvent); this->_packet_transmitter_mutex = new_ProxyMutex(); this->_frame_transmitter_mutex = new_ProxyMutex(); this->_udp_con = udp_con; @@ -93,6 +93,51 @@ QUICNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader return nullptr; } +int +QUICNetVConnection::acceptEvent(int event, Event *e) +{ + EThread *t = (e == nullptr) ? this_ethread() : e->ethread; + NetHandler *h = get_NetHandler(t); + + MUTEX_TRY_LOCK(lock, h->mutex, t); + if (!lock.is_locked()) { + if (event == EVENT_NONE) { + t->schedule_in(this, HRTIME_MSECONDS(net_retry_delay)); + return EVENT_DONE; + } else { + e->schedule_in(HRTIME_MSECONDS(net_retry_delay)); + return EVENT_CONT; + } + } + + thread = t; + + // Send this NetVC to NetHandler and start to polling read & write event. + if (h->startIO(this) < 0) { + free(t); + return EVENT_DONE; + } + + // Handshake callback handler. + SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); + + // Send this netvc to InactivityCop. + nh->startCop(this); + + if (inactivity_timeout_in) { + set_inactivity_timeout(inactivity_timeout_in); + } else { + set_inactivity_timeout(0); + } + + if (active_timeout_in) { + set_active_timeout(active_timeout_in); + } + + action_.continuation->handleEvent(NET_EVENT_ACCEPT, this); + return EVENT_DONE; +} + int QUICNetVConnection::startEvent(int /*event ATS_UNUSED */, Event *e) { @@ -589,8 +634,7 @@ QUICNetVConnection::get_udp_con() void QUICNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) { - ink_assert(false); - + this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr); return; } diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index e60b442afcf..8ac76c0ea9c 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -124,6 +124,7 @@ QUICPacketHandlerIn::init_accept(EThread *t = nullptr) void QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) { + EThread *eth; IOBufferBlock *block = udp_packet->getIOBlockChain(); if (is_debug_tag_set("quic_sec")) { @@ -158,6 +159,8 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) return; } + eth = eventProcessor.assign_thread(ET_NET); + // Create a new NetVConnection QUICConnectionId original_cid = this->_read_connection_id(block); QUICNetVConnection *vc = static_cast(getNetProcessor()->allocate_vc(nullptr)); @@ -165,29 +168,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) vc->id = net_next_connection_number(); vc->con.move(con); vc->submit_time = Thread::get_hrtime(); - vc->mutex = this->mutex; + vc->thread = eth; + vc->mutex = new_ProxyMutex(); vc->action_ = *this->action_; vc->set_is_transparent(this->opt.f_inbound_transparent); vc->set_context(NET_VCONNECTION_IN); - vc->read.triggered = 1; - vc->start(this->_ssl_ctx); vc->options.ip_proto = NetVCOptions::USE_UDP; vc->options.ip_family = udp_packet->from.sa.sa_family; - this->action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc); qc = vc; } - if (qc->is_closed()) { - this->_ctable.erase(qc->connection_id(), qc); - // FIXME QUICNetVConnection is NOT freed to prevent crashes. #2674 - // QUICNetVConnections are going to be freed by QUICNetHandler - // vc->free(vc->thread); - } else { - qc->handle_received_packet(udp_packet); - // FIXME This cast is temporal. It'll be removed when we introduce QUICNetHandler. - eventProcessor.schedule_imm(static_cast(qc), ET_CALL, QUIC_EVENT_PACKET_READ_READY, nullptr); - } + // Push the packet into QUICPollCont + udp_packet->data.ptr = vc; + get_QUICPollCont(eth)->inQueue.push(udp_packet); + } // TODO: Should be called via eventProcessor? From 95bd1db4395dc95b92964e1584f69250b9186c76 Mon Sep 17 00:00:00 2001 From: scw00 Date: Sun, 28 Jan 2018 21:22:41 +0800 Subject: [PATCH 2/6] complete pollcont --- cmd/traffic_quic/quic_client.h | 1 + cmd/traffic_quic/traffic_quic.cc | 2 + iocore/net/I_UDPPacket.h | 2 +- iocore/net/Makefile.am | 2 + iocore/net/P_Net.h | 1 + iocore/net/P_QUICNet.h | 6 ++ iocore/net/P_QUICNetProcessor.h | 1 + iocore/net/P_QUICNetVConnection.h | 3 + iocore/net/QUICNet.cc | 141 ++++++++++++++++++------------ iocore/net/QUICNetProcessor.cc | 10 +++ iocore/net/QUICNetVConnection.cc | 1 + iocore/net/QUICPacketHandler.cc | 14 ++- proxy/Main.cc | 4 + 13 files changed, 129 insertions(+), 59 deletions(-) diff --git a/cmd/traffic_quic/quic_client.h b/cmd/traffic_quic/quic_client.h index 05d0c47581d..575d7a18c41 100644 --- a/cmd/traffic_quic/quic_client.h +++ b/cmd/traffic_quic/quic_client.h @@ -23,6 +23,7 @@ #pragma once +#include "P_Net.h" #include "I_EventSystem.h" #include "I_NetVConnection.h" #include "P_QUICNetProcessor.h" diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc index 5e483491d7c..19ea9c2f533 100644 --- a/cmd/traffic_quic/traffic_quic.cc +++ b/cmd/traffic_quic/traffic_quic.cc @@ -78,6 +78,8 @@ main(int argc, const char **argv) SSLInitializeLibrary(); SSLConfig::startup(); + quic_NetProcessor.init(); + ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION); eventProcessor.start(THREADS); udpNet.start(1, stacksize); diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index 271a2502e1a..fd047e56b97 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -63,7 +63,7 @@ class UDPPacket int from_size; typedef union udppacket_data { - void *ptr; + void *ptr; uint32_t u32; uint64_t u64; } udppacket_data_t; diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index b044e80c34e..f32cc13678a 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -162,10 +162,12 @@ libinknet_a_SOURCES = \ if ENABLE_QUIC libinknet_a_SOURCES += \ P_QUICPacketHandler.h \ + P_QUICNet.h \ P_QUICNetProcessor.h \ P_QUICNetVConnection.h \ P_QUICNextProtocolAccept.h \ QUICPacketHandler.cc \ + QUICNet.cc \ QUICNetProcessor.cc \ QUICNetVConnection.cc \ QUICNextProtocolAccept.cc diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h index 0a7a502a3ea..38504ee0c0a 100644 --- a/iocore/net/P_Net.h +++ b/iocore/net/P_Net.h @@ -113,6 +113,7 @@ extern RecRawStatBlock *net_rsb; #include "P_QUICNetVConnection.h" #include "P_QUICNetProcessor.h" #include "P_QUICPacketHandler.h" +#include "P_QUICNet.h" #endif // #include "P_QUICCertLookup.h" diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h index fba0e96694d..7231207885e 100644 --- a/iocore/net/P_QUICNet.h +++ b/iocore/net/P_QUICNet.h @@ -32,6 +32,8 @@ class NetHandler; typedef int (NetHandler::*NetContHandler)(int, void *); +void initialize_thread_for_quic_net(EThread *thread); + struct QUICPollCont : public Continuation { NetHandler *net_handler; PollDescriptor *pollDescriptor; @@ -49,6 +51,10 @@ struct QUICPollCont : public Continuation { Que(UDPPacket, link) longInQueue; // Internal Queue to save Short Header Packet Que(UDPPacket, link) shortInQueue; + +private: + void _process_short_header_packet(UDPPacketInternal *p, NetHandler *nh); + void _process_long_header_packet(UDPPacketInternal *p, NetHandler *nh); }; static inline QUICPollCont * diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h index 2b7eb10f48c..c9d6c2adfb5 100644 --- a/iocore/net/P_QUICNetProcessor.h +++ b/iocore/net/P_QUICNetProcessor.h @@ -56,6 +56,7 @@ class QUICNetProcessor : public UnixNetProcessor QUICNetProcessor(); virtual ~QUICNetProcessor(); + void init() override; virtual int start(int, size_t stacksize) override; void cleanup(); // TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h index 4e999d95d8c..8f4d7e56656 100644 --- a/iocore/net/P_QUICNetVConnection.h +++ b/iocore/net/P_QUICNetVConnection.h @@ -145,6 +145,9 @@ class QUICNetVConnection : public UnixNetVConnection, public QUICConnection QUICNetVConnection() {} void init(QUICConnectionId original_cid, UDPConnection *, QUICPacketHandler *, QUICConnectionTable *ctable = nullptr); + // accept new conn_id + int acceptEvent(int event, Event *e); + // UnixNetVConnection void reenable(VIO *vio) override; VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override; diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc index c40121fb70d..3e730afcf18 100644 --- a/iocore/net/QUICNet.cc +++ b/iocore/net/QUICNet.cc @@ -23,14 +23,12 @@ #include "P_Net.h" -QUICPollCont::QUICPollCont(Ptr &m) - : Continuation(m.get()), net_handler(nullptr) +QUICPollCont::QUICPollCont(Ptr &m) : Continuation(m.get()), net_handler(nullptr) { - SET_HANDLER(&PollCont::pollEvent); + SET_HANDLER(&QUICPollCont::pollEvent); } -QUICPollCont::QUICPollCont(Ptr &m, NetHandler *nh) - : Continuation(m.get()), net_handler(nh) +QUICPollCont::QUICPollCont(Ptr &m, NetHandler *nh) : Continuation(m.get()), net_handler(nh) { SET_HANDLER(&QUICPollCont::pollEvent); } @@ -39,6 +37,83 @@ QUICPollCont::~QUICPollCont() { } +void +QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh) +{ + QUICNetVConnection *vc; + QUICPacketType ptype; + uint8_t *buf; + + // FIXME: VC is nullptr ? + vc = static_cast(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + if (!QUICTypeUtil::has_connection_id(reinterpret_cast(buf))) { + // TODO: Some packets may not have connection id + p->free(); + return; + } + + ptype = static_cast(buf[0] & 0x7f); + switch (ptype) { + case QUICPacketType::INITIAL: + vc->read.triggered = 1; + vc->handle_received_packet(p); + this->mutex->thread_holding->schedule_imm(vc); + return; + case QUICPacketType::ZERO_RTT_PROTECTED: + // TODO:: do something ? + // break; + case QUICPacketType::HANDSHAKE: + default: + // Just Pass Through + if (vc) { + vc->read.triggered = 1; + vc->handle_received_packet(p); + } else { + longInQueue.push(p); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } + break; + } +} + +void +QUICPollCont::_process_short_header_packet(UDPPacketInternal *p, NetHandler *nh) +{ + QUICNetVConnection *vc; + uint8_t *buf; + + vc = static_cast(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + if (!QUICTypeUtil::has_connection_id(reinterpret_cast(buf))) { + // TODO: Some packets may not have connection id + p->free(); + return; + } + + if (vc) { + vc->read.triggered = 1; + vc->handle_received_packet(p); + } else { + shortInQueue.push(p); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } +} + // // QUICPollCont continuation which traverse the inQueue(ASLL) // and create new QUICNetVC for Initial Packet, @@ -47,15 +122,10 @@ QUICPollCont::~QUICPollCont() int QUICPollCont::pollEvent(int, Event *) { - UnixUDPConnection *uc; - QUICPacketHandler *ph; - QUICNetVConnection *vc; - QUICConnectionId cid; + ink_assert(this->mutex->thread_holding == this_thread()); uint8_t *buf; - uint8_t ptype; - UDPPacket *packet_r; UDPPacketInternal *p = nullptr; - NetHandler *nh = get_NetHandler(t); + NetHandler *nh = get_NetHandler(this->mutex->thread_holding); // Process the ASLL SList(UDPPacketInternal, alink) aq(inQueue.popall()); @@ -65,50 +135,14 @@ QUICPollCont::pollEvent(int, Event *) } while ((p = result.pop())) { - uc = static_cast(p->getConnection()); - ph = static_cast(uc->continuation); - vc = static_cast(p->data.ptr); buf = (uint8_t *)p->getIOBlockChain()->buf(); - cid = QUICPacket::connection_id(buf) - if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value. - ptype = buf[0] & 0x7f; - if (ptype == QUICPacketType::INITIAL) { // Initial Packet - vc->read.triggered = 1; - vc->push_packet(p); - // reschedule the vc and callback vc->acceptEvent - this_ethread()->schedule_imm(vc); - } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet - // TODO: - } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet - if (vc) { - vc->read.triggered = 1; - vc->push_packet(p); - } else { - longInQueue.push(p); - } - } else { - ink_assert(!"not reached!"); - } - } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value. - if (vc) { - vc->read.triggered = 1; - vc->push_packet(p); - } else { - shortInQueue.push(p); - } - } else { - ink_assert(!"not reached!"); - } - - // Push QUICNetVC into nethandler's enabled list - if (vc != nullptr) { - int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); - if (!isin) { - nh->read_enable_list.push(vc); - } + if (QUICTypeUtil::has_long_header(buf)) { // Long Header Packet with Connection ID, has a valid type value. + this->_process_long_header_packet(p, nh); + } else { // Short Header Packet with Connection ID, has a valid type value. + this->_process_short_header_packet(p, nh); } } - + return EVENT_CONT; } @@ -122,4 +156,3 @@ initialize_thread_for_quic_net(EThread *thread) thread->schedule_every(quicpc, -9); } - diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc index 8dc335f6653..da372c62550 100644 --- a/iocore/net/QUICNetProcessor.cc +++ b/iocore/net/QUICNetProcessor.cc @@ -50,6 +50,16 @@ QUICNetProcessor::cleanup() SSL_CTX_free(this->_ssl_ctx); } +void +QUICNetProcessor::init() +{ + // first we allocate a QUICPollCont. + this->quicPollCont_offset = eventProcessor.allocate(sizeof(QUICPollCont)); + + // schedule event + eventProcessor.schedule_spawn(&initialize_thread_for_quic_net, ET_NET); +} + int QUICNetProcessor::start(int, size_t stacksize) { diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index b0088c851f1..e71c109161f 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -117,6 +117,7 @@ QUICNetVConnection::acceptEvent(int event, Event *e) free(t); return EVENT_DONE; } + this->read.enabled = 1; // Handshake callback handler. SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index 8ac76c0ea9c..a09c3941f5e 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -125,7 +125,8 @@ void QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) { EThread *eth; - IOBufferBlock *block = udp_packet->getIOBlockChain(); + QUICNetVConnection *vc = nullptr; + IOBufferBlock *block = udp_packet->getIOBlockChain(); if (is_debug_tag_set("quic_sec")) { ip_port_text_buffer ipb; @@ -163,7 +164,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) // Create a new NetVConnection QUICConnectionId original_cid = this->_read_connection_id(block); - QUICNetVConnection *vc = static_cast(getNetProcessor()->allocate_vc(nullptr)); + vc = static_cast(getNetProcessor()->allocate_vc(nullptr)); vc->init(original_cid, udp_packet->getConnection(), this, &this->_ctable); vc->id = net_next_connection_number(); vc->con.move(con); @@ -173,16 +174,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) vc->action_ = *this->action_; vc->set_is_transparent(this->opt.f_inbound_transparent); vc->set_context(NET_VCONNECTION_IN); + vc->read.triggered = 1; + vc->start(this->_ssl_ctx); vc->options.ip_proto = NetVCOptions::USE_UDP; vc->options.ip_family = udp_packet->from.sa.sa_family; qc = vc; + } else { + vc = static_cast(qc); + eth = vc->thread; } // Push the packet into QUICPollCont udp_packet->data.ptr = vc; - get_QUICPollCont(eth)->inQueue.push(udp_packet); - + // should we use dynamic_cast ?? + get_QUICPollCont(eth)->inQueue.push(static_cast(udp_packet)); } // TODO: Should be called via eventProcessor? diff --git a/proxy/Main.cc b/proxy/Main.cc index 5e36c0e16df..5614444ba0f 100644 --- a/proxy/Main.cc +++ b/proxy/Main.cc @@ -1800,6 +1800,10 @@ main(int /* argc ATS_UNUSED */, const char **argv) // Do the inits for NetProcessors that use ET_NET threads. MUST be before starting those threads. netProcessor.init(); init_HttpProxyServer(); +#if TS_USE_QUIC == 1 + // OK, pushing a spawn scheduling here + quic_NetProcessor.init(); +#endif // !! ET_NET threads start here !! // This means any spawn scheduling must be done before this point. From 28374108068e9dfbd3c81cdf839292c7d8d950c9 Mon Sep 17 00:00:00 2001 From: scw00 Date: Wed, 31 Jan 2018 14:35:20 +0800 Subject: [PATCH 3/6] [QUIC Client] fix the quic client --- cmd/traffic_quic/traffic_quic.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc index 19ea9c2f533..9749ce19900 100644 --- a/cmd/traffic_quic/traffic_quic.cc +++ b/cmd/traffic_quic/traffic_quic.cc @@ -74,10 +74,12 @@ main(int argc, const char **argv) Thread *main_thread = new EThread; main_thread->set_specific(); net_config_poll_timeout = 10; + ink_net_init(makeModuleVersion(1, 0, PRIVATE_MODULE_HEADER)); SSLInitializeLibrary(); SSLConfig::startup(); + netProcessor.init(); quic_NetProcessor.init(); ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION); @@ -120,7 +122,7 @@ DNSConnection::trigger() void StatPagesManager::register_http(char const *, Action *(*)(Continuation *, HTTPHdr *)) { - ink_assert(false); + // ink_assert(false); } #include "ParentSelection.h" From ef1538587c96ac013e33141d6ea42ef9e159d620 Mon Sep 17 00:00:00 2001 From: scw00 Date: Sat, 3 Feb 2018 08:59:27 +0800 Subject: [PATCH 4/6] Add syscall into EventIO for qvc to avoid system call --- iocore/net/P_UDPNet.h | 2 ++ iocore/net/P_UnixNet.h | 33 ++++++++++++++++---------------- iocore/net/QUICNet.cc | 2 +- iocore/net/QUICNetProcessor.cc | 2 ++ iocore/net/QUICNetVConnection.cc | 1 + iocore/net/UnixUDPNet.cc | 4 ++-- 6 files changed, 24 insertions(+), 20 deletions(-) diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index 599b6af6068..52de3b8b0ef 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -59,6 +59,8 @@ extern UDPNetProcessorInternal udpNetInternal; #define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC) #define N_SLOTS 2048 +constexpr int UDP_PERIOD = 9; + class PacketQueue { public: diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 6b9ad5f72c5..ab45b663017 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -83,6 +83,7 @@ struct EventIO { int events = 0; #endif EventLoop event_loop = nullptr; + bool syscall = true; int type = 0; union { Continuation *c; @@ -575,15 +576,8 @@ EventIO::start(EventLoop l, NetAccept *vc, int events) TS_INLINE int EventIO::start(EventLoop l, UnixNetVConnection *vc, int events) { - int r; type = EVENTIO_READWRITE_VC; - r = start(l, vc->con.fd, (Continuation *)vc, events); - if (r < 0 && vc->options.ip_proto == NetVCOptions::USE_UDP) { - // Hack for QUICNetVC - return 0; - } else { - return r; - } + return start(l, vc->con.fd, (Continuation *)vc, events); } TS_INLINE int EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) @@ -594,6 +588,10 @@ EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) TS_INLINE int EventIO::close() { + if (!this->syscall) { + return 0; + } + stop(); switch (type) { default: @@ -615,15 +613,13 @@ EventIO::close() TS_INLINE int EventIO::start(EventLoop l, int afd, Continuation *c, int e) { + if (!this->syscall) { + return 0; + } + data.c = c; fd = afd; event_loop = l; - // Hack for QUICNetVC: - // quicnetvc->con.fd == NO_FD - // quicnetvc->options.ip_proto == NetVCOptions::USE_UDP - if (afd == NO_FD) { - return -1; - } #if TS_USE_EPOLL struct epoll_event ev; memset(&ev, 0, sizeof(ev)); @@ -656,9 +652,10 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e) TS_INLINE int EventIO::modify(int e) { - if (fd == NO_FD) { + if (!this->syscall) { return 0; } + ink_assert(event_loop); #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) struct epoll_event ev; @@ -738,9 +735,10 @@ EventIO::modify(int e) TS_INLINE int EventIO::refresh(int e) { - if (fd == NO_FD) { + if (!this->syscall) { return 0; } + ink_assert(event_loop); #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER) e = e & events; @@ -782,9 +780,10 @@ EventIO::refresh(int e) TS_INLINE int EventIO::stop() { - if (fd == NO_FD) { + if (!this->syscall) { return 0; } + if (event_loop) { int retval = 0; #if TS_USE_EPOLL diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc index 3e730afcf18..33763b8a98a 100644 --- a/iocore/net/QUICNet.cc +++ b/iocore/net/QUICNet.cc @@ -154,5 +154,5 @@ initialize_thread_for_quic_net(EThread *thread) new ((ink_dummy_for_new *)quicpc) QUICPollCont(thread->mutex, nh); - thread->schedule_every(quicpc, -9); + thread->schedule_every(quicpc, -UDP_PERIOD); } diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc index da372c62550..1064bd79bdc 100644 --- a/iocore/net/QUICNetProcessor.cc +++ b/iocore/net/QUICNetProcessor.cc @@ -181,6 +181,8 @@ QUICNetProcessor::connect_re(Continuation *cont, sockaddr const *remote_addr, Ne vc->mutex = cont->mutex; vc->action_ = cont; + SET_CONTINUATION_HANDLER(vc, &QUICNetVConnection::state_pre_handshake); + vc->start(this->_ssl_ctx); vc->connectUp(t, NO_FD); diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index e71c109161f..ee1d16f2482 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -70,6 +70,7 @@ QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con, this->_packet_handler = packet_handler; this->_original_quic_connection_id = original_cid; this->_quic_connection_id.randomize(); + this->ep.syscall = false; // PacketHandler for out going connection doesn't have connection table if (ctable) { this->_ctable = ctable; diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index 5a682062463..d5004fe8861 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -86,7 +86,7 @@ initialize_thread_for_udp_net(EThread *thread) REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries"); g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries; - thread->schedule_every(get_UDPPollCont(thread), -9); + thread->schedule_every(get_UDPPollCont(thread), -UDP_PERIOD); thread->schedule_imm(get_UDPNetHandler(thread)); } @@ -923,7 +923,7 @@ UDPNetHandler::startNetEvent(int event, Event *e) (void)event; SET_HANDLER((UDPNetContHandler)&UDPNetHandler::mainNetEvent); trigger_event = e; - e->schedule_every(-HRTIME_MSECONDS(9)); + e->schedule_every(-HRTIME_MSECONDS(UDP_PERIOD)); return EVENT_CONT; } From c1b2b273c03d7224b15d2e68bb6e6232309a189b Mon Sep 17 00:00:00 2001 From: scw00 Date: Sat, 3 Feb 2018 11:04:14 +0800 Subject: [PATCH 5/6] connect_re processed by nethandler --- cmd/traffic_quic/quic_client.cc | 1 + cmd/traffic_quic/traffic_quic.cc | 2 +- iocore/net/QUICNetProcessor.cc | 24 ++++++++++++++++++++-- iocore/net/QUICNetVConnection.cc | 34 ++++++++++++++++++++++++++++++-- iocore/net/UnixUDPNet.cc | 2 +- 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/cmd/traffic_quic/quic_client.cc b/cmd/traffic_quic/quic_client.cc index c42410cbd8d..d6a7176c508 100644 --- a/cmd/traffic_quic/quic_client.cc +++ b/cmd/traffic_quic/quic_client.cc @@ -51,6 +51,7 @@ QUICClient::start() NetVCOptions opt; opt.ip_proto = NetVCOptions::USE_UDP; opt.ip_family = info->ai_family; + opt.etype = ET_NET; opt.socket_recv_bufsize = 1048576; opt.socket_send_bufsize = 1048576; diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc index 9749ce19900..c46e80f4d49 100644 --- a/cmd/traffic_quic/traffic_quic.cc +++ b/cmd/traffic_quic/traffic_quic.cc @@ -88,7 +88,7 @@ main(int argc, const char **argv) quic_NetProcessor.start(-1, stacksize); QUICClient client(addr, port); - eventProcessor.schedule_in(&client, 1, ET_UDP); + eventProcessor.schedule_in(&client, 1, ET_NET); this_thread()->execute(); } diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc index 1064bd79bdc..979af876c56 100644 --- a/iocore/net/QUICNetProcessor.cc +++ b/iocore/net/QUICNetProcessor.cc @@ -123,6 +123,7 @@ QUICNetProcessor::allocate_vc(EThread *t) } } + vc->ep.syscall = false; return vc; } @@ -181,10 +182,29 @@ QUICNetProcessor::connect_re(Continuation *cont, sockaddr const *remote_addr, Ne vc->mutex = cont->mutex; vc->action_ = cont; - SET_CONTINUATION_HANDLER(vc, &QUICNetVConnection::state_pre_handshake); + SET_CONTINUATION_HANDLER(vc, &QUICNetVConnection::startEvent); vc->start(this->_ssl_ctx); - vc->connectUp(t, NO_FD); + + if (t->is_event_type(opt->etype)) { + MUTEX_TRY_LOCK(lock, cont->mutex, t); + if (lock.is_locked()) { + MUTEX_TRY_LOCK(lock2, get_NetHandler(t)->mutex, t); + if (lock2.is_locked()) { + vc->connectUp(t, NO_FD); + return ACTION_RESULT_DONE; + } + } + } + + // Try to stay on the current thread if it is the right type + if (t->is_event_type(opt->etype)) { + t->schedule_imm(vc); + } else { // Otherwise, pass along to another thread of the right type + eventProcessor.schedule_imm(vc, opt->etype); + } + + // vc->connectUp(t, NO_FD); return ACTION_RESULT_DONE; } diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index ee1d16f2482..dc8823f3ff7 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -70,7 +70,6 @@ QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con, this->_packet_handler = packet_handler; this->_original_quic_connection_id = original_cid; this->_quic_connection_id.randomize(); - this->ep.syscall = false; // PacketHandler for out going connection doesn't have connection table if (ctable) { this->_ctable = ctable; @@ -118,6 +117,8 @@ QUICNetVConnection::acceptEvent(int event, Event *e) free(t); return EVENT_DONE; } + + // FIXME: complete do_io_xxxx instead this->read.enabled = 1; // Handshake callback handler. @@ -141,8 +142,21 @@ QUICNetVConnection::acceptEvent(int event, Event *e) } int -QUICNetVConnection::startEvent(int /*event ATS_UNUSED */, Event *e) +QUICNetVConnection::startEvent(int event, Event *e) { + ink_assert(event == EVENT_IMMEDIATE); + MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread); + if (!lock.is_locked()) { + e->schedule_in(HRTIME_MSECONDS(net_retry_delay)); + return EVENT_CONT; + } + + if (!action_.cancelled) { + this->connectUp(e->ethread, NO_FD); + } else { + this->free(e->ethread); + } + return EVENT_DONE; } @@ -220,6 +234,17 @@ QUICNetVConnection::reenable(VIO *vio) int QUICNetVConnection::connectUp(EThread *t, int fd) { + int res = 0; + NetHandler *nh = get_NetHandler(t); + this->thread = this_ethread(); + ink_assert(nh->mutex->thread_holding == this->thread); + + SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); + + if ((res = nh->startIO(this)) < 0) { + // FIXME: startIO only return 0 now! what should we do if it failed ? + } + // create stream for handshake QUICErrorUPtr error = this->_stream_manager->create_stream(STREAM_ID_FOR_HANDSHAKE); if (error->cls != QUICErrorClass::NONE) { @@ -228,6 +253,11 @@ QUICNetVConnection::connectUp(EThread *t, int fd) return CONNECT_FAILURE; } + nh->startCop(this); + + // FIXME: complete do_io_xxxx instead + this->read.enabled = 1; + // start QUIC handshake this->_handshake_handler->handleEvent(VC_EVENT_WRITE_READY, nullptr); diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index d5004fe8861..e3891b8ff4b 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -86,7 +86,7 @@ initialize_thread_for_udp_net(EThread *thread) REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries"); g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries; - thread->schedule_every(get_UDPPollCont(thread), -UDP_PERIOD); + thread->schedule_every(get_UDPPollCont(thread), -HRTIME_MSECONDS(UDP_PERIOD)); thread->schedule_imm(get_UDPNetHandler(thread)); } From d7be5efac03b7ace0e41697310a2f66f76582953 Mon Sep 17 00:00:00 2001 From: scw00 Date: Tue, 6 Feb 2018 14:01:38 +0800 Subject: [PATCH 6/6] add QUICPollEvent to packet UDPPacket and QVC --- iocore/net/I_UDPPacket.h | 6 ---- iocore/net/P_QUICNet.h | 28 ++++++++++++++---- iocore/net/QUICNet.cc | 52 ++++++++++++++++++++------------- iocore/net/QUICPacketHandler.cc | 12 +++++--- 4 files changed, 62 insertions(+), 36 deletions(-) diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index fd047e56b97..df899ff389d 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -62,12 +62,6 @@ class UDPPacket IpEndpoint to; // what address to send to int from_size; - typedef union udppacket_data { - void *ptr; - uint32_t u32; - uint64_t u64; - } udppacket_data_t; - udppacket_data_t data; LINK(UDPPacket, link); }; diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h index 7231207885e..297ddbe3521 100644 --- a/iocore/net/P_QUICNet.h +++ b/iocore/net/P_QUICNet.h @@ -29,11 +29,28 @@ #include "ts/ink_platform.h" #include "P_Net.h" + class NetHandler; typedef int (NetHandler::*NetContHandler)(int, void *); void initialize_thread_for_quic_net(EThread *thread); +struct QUICPollEvent { + typedef union data_t { + void *ptr; + uint32_t u32; + uint64_t u64; + } data_t; + + void free(); + + data_t data; + UDPPacketInternal *packet; + + SLINK(QUICPollEvent, alink); + LINK(QUICPollEvent, link); +}; + struct QUICPollCont : public Continuation { NetHandler *net_handler; PollDescriptor *pollDescriptor; @@ -45,16 +62,16 @@ struct QUICPollCont : public Continuation { public: // Atomic Queue to save incoming packets - ASLL(UDPPacketInternal, alink) inQueue; + ASLL(QUICPollEvent, alink) inQueue; // Internal Queue to save Long Header Packet - Que(UDPPacket, link) longInQueue; + Que(UDPPacketInternal, link) longInQueue; // Internal Queue to save Short Header Packet - Que(UDPPacket, link) shortInQueue; + Que(UDPPacketInternal, link) shortInQueue; private: - void _process_short_header_packet(UDPPacketInternal *p, NetHandler *nh); - void _process_long_header_packet(UDPPacketInternal *p, NetHandler *nh); + void _process_short_header_packet(QUICPollEvent *e, NetHandler *nh); + void _process_long_header_packet(QUICPollEvent *e, NetHandler *nh); }; static inline QUICPollCont * @@ -63,4 +80,5 @@ get_QUICPollCont(EThread *t) return (QUICPollCont *)ETHREAD_GET_PTR(t, quic_NetProcessor.quicPollCont_offset); } +extern ClassAllocator quicPollEventAllocator; #endif diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc index 33763b8a98a..68c3bee11c1 100644 --- a/iocore/net/QUICNet.cc +++ b/iocore/net/QUICNet.cc @@ -23,6 +23,14 @@ #include "P_Net.h" +ClassAllocator quicPollEventAllocator("quicPollEvent"); + +void +QUICPollEvent::free() +{ + quicPollEventAllocator.free(this); +} + QUICPollCont::QUICPollCont(Ptr &m) : Continuation(m.get()), net_handler(nullptr) { SET_HANDLER(&QUICPollCont::pollEvent); @@ -38,15 +46,16 @@ QUICPollCont::~QUICPollCont() } void -QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh) +QUICPollCont::_process_long_header_packet(QUICPollEvent *e, NetHandler *nh) { - QUICNetVConnection *vc; - QUICPacketType ptype; uint8_t *buf; - + QUICPacketType ptype; + UDPPacketInternal *p = e->packet; // FIXME: VC is nullptr ? - vc = static_cast(p->data.ptr); - buf = (uint8_t *)p->getIOBlockChain()->buf(); + QUICNetVConnection *vc = static_cast(e->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + + e->free(); if (!QUICTypeUtil::has_connection_id(reinterpret_cast(buf))) { // TODO: Some packets may not have connection id p->free(); @@ -85,13 +94,14 @@ QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh) } void -QUICPollCont::_process_short_header_packet(UDPPacketInternal *p, NetHandler *nh) +QUICPollCont::_process_short_header_packet(QUICPollEvent *e, NetHandler *nh) { - QUICNetVConnection *vc; uint8_t *buf; + UDPPacketInternal *p = e->packet; + QUICNetVConnection *vc = static_cast(e->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); - vc = static_cast(p->data.ptr); - buf = (uint8_t *)p->getIOBlockChain()->buf(); + e->free(); if (!QUICTypeUtil::has_connection_id(reinterpret_cast(buf))) { // TODO: Some packets may not have connection id p->free(); @@ -124,22 +134,22 @@ QUICPollCont::pollEvent(int, Event *) { ink_assert(this->mutex->thread_holding == this_thread()); uint8_t *buf; - UDPPacketInternal *p = nullptr; - NetHandler *nh = get_NetHandler(this->mutex->thread_holding); + QUICPollEvent *e; + NetHandler *nh = get_NetHandler(this->mutex->thread_holding); // Process the ASLL - SList(UDPPacketInternal, alink) aq(inQueue.popall()); - Queue result; - while ((p = aq.pop())) { - result.push(p); + SList(QUICPollEvent, link) aq(inQueue.popall()); + Queue result; + while ((e = aq.pop())) { + result.push(e); } - while ((p = result.pop())) { - buf = (uint8_t *)p->getIOBlockChain()->buf(); + while ((e = result.pop())) { + buf = (uint8_t *)e->packet->getIOBlockChain()->buf(); if (QUICTypeUtil::has_long_header(buf)) { // Long Header Packet with Connection ID, has a valid type value. - this->_process_long_header_packet(p, nh); + this->_process_long_header_packet(e, nh); } else { // Short Header Packet with Connection ID, has a valid type value. - this->_process_short_header_packet(p, nh); + this->_process_short_header_packet(e, nh); } } @@ -154,5 +164,5 @@ initialize_thread_for_quic_net(EThread *thread) new ((ink_dummy_for_new *)quicpc) QUICPollCont(thread->mutex, nh); - thread->schedule_every(quicpc, -UDP_PERIOD); + thread->schedule_every(quicpc, -HRTIME_MSECONDS(UDP_PERIOD)); } diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index a09c3941f5e..5eb99662816 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -124,7 +124,8 @@ QUICPacketHandlerIn::init_accept(EThread *t = nullptr) void QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) { - EThread *eth; + EThread *eth = nullptr; + QUICPollEvent *qe = nullptr; QUICNetVConnection *vc = nullptr; IOBufferBlock *block = udp_packet->getIOBlockChain(); @@ -185,10 +186,13 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) eth = vc->thread; } - // Push the packet into QUICPollCont - udp_packet->data.ptr = vc; + qe = quicPollEventAllocator.alloc(); + + qe->data.ptr = vc; // should we use dynamic_cast ?? - get_QUICPollCont(eth)->inQueue.push(static_cast(udp_packet)); + qe->packet = static_cast(udp_packet); + // Push the packet into QUICPollCont + get_QUICPollCont(eth)->inQueue.push(qe); } // TODO: Should be called via eventProcessor?