From 7180c2221bd4cb78c928857ae7c2c277a50748c5 Mon Sep 17 00:00:00 2001 From: scw00 Date: Fri, 22 Nov 2019 15:35:12 +0800 Subject: [PATCH 01/12] Introduce NetEventHandler to split UnixNetVC and NetHandler --- iocore/net/NetEventHandler.h | 96 +++++++++++++++++++++++++++++++ iocore/net/P_UnixNet.h | 85 ++++++++++++++------------- iocore/net/P_UnixNetState.h | 6 +- iocore/net/P_UnixNetVConnection.h | 86 ++++++++++++++++----------- iocore/net/UnixNet.cc | 84 +++++++++++++-------------- iocore/net/UnixNetPages.cc | 10 ++-- iocore/net/UnixNetVConnection.cc | 9 ++- 7 files changed, 247 insertions(+), 129 deletions(-) create mode 100644 iocore/net/NetEventHandler.h diff --git a/iocore/net/NetEventHandler.h b/iocore/net/NetEventHandler.h new file mode 100644 index 00000000000..7b74186dc7c --- /dev/null +++ b/iocore/net/NetEventHandler.h @@ -0,0 +1,96 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_EventSystem.h" + +class NetHandler; + +// this class is used to NetHandler to hide some detail of NetEventHandler. +// To combine the `UDPConenction` and `NetEventHandler`. NetHandler should +// callback to net_read_io or net_write_io when net event happen. +class NetEventHandler +{ +public: + NetEventHandler() = default; + virtual ~NetEventHandler() {} + virtual void net_read_io(NetHandler *nh, EThread *lthread) = 0; + virtual void net_write_io(NetHandler *nh, EThread *lthread) = 0; + virtual void free(EThread *t) = 0; + + // since we want this class to be independent from VConnection, Continutaion. There should be + // a pure virtual function which connect sub class and NetHandler. + virtual int callback(int event = CONTINUATION_EVENT_NONE, void *data = nullptr) = 0; + + // Duplicate with `NetVConnection::set_inactivity_timeout` + // TODO: more abstraction. + virtual void set_inactivity_timeout(ink_hrtime timeout_in) = 0; + + // get this vc's thread + virtual EThread *get_thread() = 0; + + // Close when EventIO close; + virtual int close() = 0; + + // get fd + virtual int get_fd() = 0; + virtual Ptr &get_mutex() = 0; + virtual ContFlags &get_control_flags() = 0; + virtual sockaddr const *get_remote_addr() = 0; + virtual const NetVCOptions &get_options() = 0; + + EventIO ep{}; + NetState read{}; + NetState write{}; + + bool closed = false; + NetHandler *nh = nullptr; + + unsigned int id = 0; + + ink_hrtime inactivity_timeout_in = 0; + ink_hrtime active_timeout_in = 0; + ink_hrtime next_inactivity_timeout_at = 0; + ink_hrtime next_activity_timeout_at = 0; + ink_hrtime submit_time = 0; + + LINK(NetEventHandler, open_link); + LINK(NetEventHandler, cop_link); + LINKM(NetEventHandler, read, ready_link) + SLINKM(NetEventHandler, read, enable_link) + LINKM(NetEventHandler, write, ready_link) + SLINKM(NetEventHandler, write, enable_link) + LINK(NetEventHandler, keep_alive_queue_link); + LINK(NetEventHandler, active_queue_link); + + union { + unsigned int flags = 0; +#define NET_VC_SHUTDOWN_READ 1 +#define NET_VC_SHUTDOWN_WRITE 2 + struct { + unsigned int got_local_addr : 1; + unsigned int shutdown : 2; + } f; + }; +}; diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 91ccb6fec0d..e477de77c4c 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -75,7 +75,7 @@ struct PollDescriptor; typedef PollDescriptor *EventLoop; -class UnixNetVConnection; +class NetEventHandler; class UnixUDPConnection; struct DNSConnection; struct NetAccept; @@ -89,14 +89,14 @@ struct EventIO { int type = 0; union { Continuation *c; - UnixNetVConnection *vc; + NetEventHandler *vc; DNSConnection *dnscon; NetAccept *na; UnixUDPConnection *uc; } data; int start(EventLoop l, DNSConnection *vc, int events); int start(EventLoop l, NetAccept *vc, int events); - int start(EventLoop l, UnixNetVConnection *vc, int events); + int start(EventLoop l, NetEventHandler *vc, int events); int start(EventLoop l, UnixUDPConnection *vc, int events); int start(EventLoop l, int fd, Continuation *c, int events); // Change the existing events by adding modify(EVENTIO_READ) @@ -118,7 +118,7 @@ struct EventIO { #include "P_UnixPollDescriptor.h" #include -class UnixNetVConnection; +class NetEventHandler; class NetHandler; typedef int (NetHandler::*NetContHandler)(int, void *); typedef unsigned int uint32; @@ -236,15 +236,15 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler // If we don't get rid of @a trigger_event we should remove @a thread. EThread *thread = nullptr; Event *trigger_event = nullptr; - QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list; - QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list; - Que(UnixNetVConnection, link) open_list; - DList(UnixNetVConnection, cop_link) cop_list; - ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list; - ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list; - Que(UnixNetVConnection, keep_alive_queue_link) keep_alive_queue; + QueM(NetEventHandler, NetState, read, ready_link) read_ready_list; + QueM(NetEventHandler, NetState, write, ready_link) write_ready_list; + Que(NetEventHandler, open_link) open_list; + DList(NetEventHandler, cop_link) cop_list; + ASLLM(NetEventHandler, NetState, read, enable_link) read_enable_list; + ASLLM(NetEventHandler, NetState, write, enable_link) write_enable_list; + Que(NetEventHandler, keep_alive_queue_link) keep_alive_queue; uint32_t keep_alive_queue_size = 0; - Que(UnixNetVConnection, active_queue_link) active_queue; + Que(NetEventHandler, active_queue_link) active_queue; uint32_t active_queue_size = 0; /// configuration settings for managing the active and keep-alive queues @@ -293,10 +293,10 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler void process_ready_list(); void manage_keep_alive_queue(); bool manage_active_queue(bool ignore_queue_size); - void add_to_keep_alive_queue(UnixNetVConnection *vc); - void remove_from_keep_alive_queue(UnixNetVConnection *vc); - bool add_to_active_queue(UnixNetVConnection *vc); - void remove_from_active_queue(UnixNetVConnection *vc); + void add_to_keep_alive_queue(NetEventHandler *vc); + void remove_from_keep_alive_queue(NetEventHandler *vc); + bool add_to_active_queue(NetEventHandler *vc); + void remove_from_active_queue(NetEventHandler *vc); /// Per process initialization logic. static void init_for_process(); @@ -304,42 +304,42 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler void configure_per_thread_values(); /** - Start to handle read & write event on a UnixNetVConnection. + Start to handle read & write event on a NetEventHandler. Initial the socket fd of netvc for polling system. Only be called when holding the mutex of this NetHandler. - @param netvc UnixNetVConnection to be managed by this NetHandler. + @param netvc NetEventHandler to be managed by this NetHandler. @return 0 on success, netvc->nh set to this NetHandler. -ERRNO on failure. */ - int startIO(UnixNetVConnection *netvc); + int startIO(NetEventHandler *netvc); /** - Stop to handle read & write event on a UnixNetVConnection. + Stop to handle read & write event on a NetEventHandler. Remove the socket fd of netvc from polling system. Only be called when holding the mutex of this NetHandler and must call stopCop(netvc) first. - @param netvc UnixNetVConnection to be released. + @param netvc NetEventHandler to be released. @return netvc->nh set to nullptr. */ - void stopIO(UnixNetVConnection *netvc); + void stopIO(NetEventHandler *netvc); /** - Start to handle active timeout and inactivity timeout on a UnixNetVConnection. + Start to handle active timeout and inactivity timeout on a NetEventHandler. Put the netvc into open_list. All NetVCs in the open_list is checked for timeout by InactivityCop. Only be called when holding the mutex of this NetHandler and must call startIO(netvc) first. - @param netvc UnixNetVConnection to be managed by InactivityCop + @param netvc NetEventHandler to be managed by InactivityCop */ - void startCop(UnixNetVConnection *netvc); + void startCop(NetEventHandler *netvc); /** - Stop to handle active timeout and inactivity on a UnixNetVConnection. + Stop to handle active timeout and inactivity on a NetEventHandler. Remove the netvc from open_list and cop_list. Also remove the netvc from keep_alive_queue and active_queue if its context is IN. Only be called when holding the mutex of this NetHandler. - @param netvc UnixNetVConnection to be released. + @param netvc NetEventHandler to be released. */ - void stopCop(UnixNetVConnection *netvc); + void stopCop(NetEventHandler *netvc); // Signal the epoll_wait to terminate. void signalActivity() override; @@ -347,15 +347,14 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler /** Release a netvc and free it. - @param netvc UnixNetVConnection to be detached. + @param netvc NetEventHandler to be detached. */ - void free_netvc(UnixNetVConnection *netvc); + void free_netvc(NetEventHandler *netvc); NetHandler(); private: - void _close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, - int &total_idle_count); + void _close_vc(NetEventHandler *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count); /// Static method used as the callback for runtime configuration updates. static int update_nethandler_config(const char *name, RecDataT, RecData data, void *); @@ -507,10 +506,10 @@ check_transient_accept_error(int res) } // -// Disable a UnixNetVConnection +// Disable a NetEventHandler // static inline void -read_disable(NetHandler *nh, UnixNetVConnection *vc) +read_disable(NetHandler *nh, NetEventHandler *vc) { if (!vc->write.enabled) { vc->set_inactivity_timeout(0); @@ -522,7 +521,7 @@ read_disable(NetHandler *nh, UnixNetVConnection *vc) } static inline void -write_disable(NetHandler *nh, UnixNetVConnection *vc) +write_disable(NetHandler *nh, NetEventHandler *vc) { if (!vc->read.enabled) { vc->set_inactivity_timeout(0); @@ -546,10 +545,10 @@ EventIO::start(EventLoop l, NetAccept *vc, int events) return start(l, vc->server.fd, (Continuation *)vc, events); } TS_INLINE int -EventIO::start(EventLoop l, UnixNetVConnection *vc, int events) +EventIO::start(EventLoop l, NetEventHandler *vc, int events) { type = EVENTIO_READWRITE_VC; - return start(l, vc->con.fd, (Continuation *)vc, events); + return start(l, vc->get_fd(), (Continuation *)vc, events); } TS_INLINE int EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) @@ -576,7 +575,7 @@ EventIO::close() return data.na->server.close(); break; case EVENTIO_READWRITE_VC: - return data.vc->con.close(); + return data.vc->close(); break; } return -1; @@ -775,10 +774,10 @@ EventIO::stop() } TS_INLINE int -NetHandler::startIO(UnixNetVConnection *netvc) +NetHandler::startIO(NetEventHandler *netvc) { ink_assert(this->mutex->thread_holding == this_ethread()); - ink_assert(netvc->thread == this_ethread()); + ink_assert(netvc->get_thread() == this_ethread()); int res = 0; PollDescriptor *pd = get_PollDescriptor(this->thread); @@ -799,7 +798,7 @@ NetHandler::startIO(UnixNetVConnection *netvc) } TS_INLINE void -NetHandler::stopIO(UnixNetVConnection *netvc) +NetHandler::stopIO(NetEventHandler *netvc) { ink_release_assert(netvc->nh == this); @@ -820,7 +819,7 @@ NetHandler::stopIO(UnixNetVConnection *netvc) } TS_INLINE void -NetHandler::startCop(UnixNetVConnection *netvc) +NetHandler::startCop(NetEventHandler *netvc) { ink_assert(this->mutex->thread_holding == this_ethread()); ink_release_assert(netvc->nh == this); @@ -830,7 +829,7 @@ NetHandler::startCop(UnixNetVConnection *netvc) } TS_INLINE void -NetHandler::stopCop(UnixNetVConnection *netvc) +NetHandler::stopCop(NetEventHandler *netvc) { ink_release_assert(netvc->nh == this); diff --git a/iocore/net/P_UnixNetState.h b/iocore/net/P_UnixNetState.h index 3a2265c92c9..8bdd7d7b33d 100644 --- a/iocore/net/P_UnixNetState.h +++ b/iocore/net/P_UnixNetState.h @@ -40,13 +40,13 @@ #include "I_VIO.h" class Event; -class UnixNetVConnection; +class NetEventHandler; struct NetState { int enabled = 0; VIO vio; - Link ready_link; - SLink enable_link; + Link ready_link; + SLink enable_link; int in_enabled_list = 0; int triggered = 0; diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index 26d573de515..06b69b1e6af 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -36,6 +36,7 @@ #include "P_UnixNetState.h" #include "P_Connection.h" #include "P_NetAccept.h" +#include "NetEventHandler.h" class UnixNetVConnection; class NetHandler; @@ -104,7 +105,7 @@ struct OOB_callback : public Continuation { enum tcp_congestion_control_t { CLIENT_SIDE, SERVER_SIDE }; -class UnixNetVConnection : public NetVConnection +class UnixNetVConnection : public NetVConnection, public NetEventHandler { public: int64_t outstanding() override; @@ -216,7 +217,56 @@ class UnixNetVConnection : public NetVConnection return false; } - virtual void net_read_io(NetHandler *nh, EThread *lthread); + // NetEventHandler + virtual void net_read_io(NetHandler *nh, EThread *lthread) override; + virtual void net_write_io(NetHandler *nh, EThread *lthread) override; + virtual void free(EThread *t) override; + virtual int + close() override + { + return this->con.close(); + } + virtual int + get_fd() override + { + return this->con.fd; + } + + virtual EThread * + get_thread() override + { + return this->thread; + } + + virtual int + callback(int event = CONTINUATION_EVENT_NONE, void *data = nullptr) override + { + return this->handleEvent(event, data); + } + + virtual Ptr & + get_mutex() override + { + return this->mutex; + } + + virtual ContFlags & + get_control_flags() override + { + return this->control_flags; + } + + virtual sockaddr const * + get_remote_addr() override + { + return NetVConnection::get_remote_addr(); + } + virtual const NetVCOptions & + get_options() override + { + return this->options; + } + virtual int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs); void readDisable(NetHandler *nh); void readSignalError(NetHandler *nh, int err); @@ -233,40 +283,9 @@ class UnixNetVConnection : public NetVConnection UnixNetVConnection *migrateToCurrentThread(Continuation *c, EThread *t); Action action_; - int closed = 0; - NetState read; - NetState write; - - LINK(UnixNetVConnection, cop_link); - LINKM(UnixNetVConnection, read, ready_link) - SLINKM(UnixNetVConnection, read, enable_link) - LINKM(UnixNetVConnection, write, ready_link) - SLINKM(UnixNetVConnection, write, enable_link) - LINK(UnixNetVConnection, keep_alive_queue_link); - LINK(UnixNetVConnection, active_queue_link); - - ink_hrtime inactivity_timeout_in = 0; - ink_hrtime active_timeout_in = 0; - ink_hrtime next_inactivity_timeout_at = 0; - ink_hrtime next_activity_timeout_at = 0; - - EventIO ep; - NetHandler *nh = nullptr; - unsigned int id = 0; - - union { - unsigned int flags; -#define NET_VC_SHUTDOWN_READ 1 -#define NET_VC_SHUTDOWN_WRITE 2 - struct { - unsigned int got_local_addr : 1; - unsigned int shutdown : 2; - } f; - }; Connection con; int recursion = 0; - ink_hrtime submit_time = 0; OOB_callback *oob_ptr = nullptr; bool from_accept_thread = false; NetAccept *accept_object = nullptr; @@ -287,7 +306,6 @@ class UnixNetVConnection : public NetVConnection */ virtual int populate(Connection &con, Continuation *c, void *arg); virtual void clear(); - virtual void free(EThread *t); ink_hrtime get_inactivity_timeout() override; ink_hrtime get_active_timeout() override; diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index 76900778a46..ec6fad08b43 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -56,9 +56,9 @@ class InactivityCop : public Continuation Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id); // The rest NetVCs in cop_list which are not triggered between InactivityCop runs. // Use pop() to catch any closes caused by callbacks. - while (UnixNetVConnection *vc = nh.cop_list.pop()) { + while (NetEventHandler *vc = nh.cop_list.pop()) { // If we cannot get the lock don't stop just keep cleaning - MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread()); + MUTEX_TRY_LOCK(lock, vc->get_mutex(), this_ethread()); if (!lock.is_locked()) { NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat); continue; @@ -78,18 +78,18 @@ class InactivityCop : public Continuation } Debug("inactivity_cop_verbose", "vc: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, vc, ink_hrtime_to_sec(now), vc->next_inactivity_timeout_at, vc->inactivity_timeout_in); - vc->handleEvent(VC_EVENT_INACTIVITY_TIMEOUT, e); + vc->callback(VC_EVENT_INACTIVITY_TIMEOUT, e); } else if (vc->next_activity_timeout_at && vc->next_activity_timeout_at < now) { Debug("inactivity_cop_verbose", "active vc: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, vc, ink_hrtime_to_sec(now), vc->next_activity_timeout_at, vc->active_timeout_in); - vc->handleEvent(VC_EVENT_ACTIVE_TIMEOUT, e); + vc->callback(VC_EVENT_ACTIVE_TIMEOUT, e); } } // The cop_list is empty now. // Let's reload the cop_list from open_list again. - forl_LL(UnixNetVConnection, vc, nh.open_list) + forl_LL(NetEventHandler, vc, nh.open_list) { - if (vc->thread == this_ethread()) { + if (vc->get_thread() == this_ethread()) { nh.cop_list.push(vc); } } @@ -333,15 +333,15 @@ NetHandler::init_for_process() } // -// Function used to release a UnixNetVConnection and free it. +// Function used to release a NetEventHandler and free it. // void -NetHandler::free_netvc(UnixNetVConnection *netvc) +NetHandler::free_netvc(NetEventHandler *netvc) { EThread *t = this->thread; ink_assert(t == this_ethread()); - ink_release_assert(netvc->thread == t); + ink_release_assert(netvc->get_thread() == t); ink_release_assert(netvc->nh == this); // Release netvc from InactivityCop @@ -358,9 +358,9 @@ NetHandler::free_netvc(UnixNetVConnection *netvc) void NetHandler::process_enabled_list() { - UnixNetVConnection *vc = nullptr; + NetEventHandler *vc = nullptr; - SListM(UnixNetVConnection, NetState, read, enable_link) rq(read_enable_list.popall()); + SListM(NetEventHandler, NetState, read, enable_link) rq(read_enable_list.popall()); while ((vc = rq.pop())) { vc->ep.modify(EVENTIO_READ); vc->ep.refresh(EVENTIO_READ); @@ -370,7 +370,7 @@ NetHandler::process_enabled_list() } } - SListM(UnixNetVConnection, NetState, write, enable_link) wq(write_enable_list.popall()); + SListM(NetEventHandler, NetState, write, enable_link) wq(write_enable_list.popall()); while ((vc = wq.pop())) { vc->ep.modify(EVENTIO_WRITE); vc->ep.refresh(EVENTIO_WRITE); @@ -387,13 +387,13 @@ NetHandler::process_enabled_list() void NetHandler::process_ready_list() { - UnixNetVConnection *vc = nullptr; + NetEventHandler *vc = nullptr; #if defined(USE_EDGE_TRIGGER) - // UnixNetVConnection * + // NetEventHandler * while ((vc = read_ready_list.dequeue())) { // Initialize the thread-local continuation flags - set_cont_flags(vc->control_flags); + set_cont_flags(vc->get_control_flags()); if (vc->closed) { free_netvc(vc); } else if (vc->read.enabled && vc->read.triggered) { @@ -410,11 +410,11 @@ NetHandler::process_ready_list() } } while ((vc = write_ready_list.dequeue())) { - set_cont_flags(vc->control_flags); + set_cont_flags(vc->get_control_flags()); if (vc->closed) { free_netvc(vc); } else if (vc->write.enabled && vc->write.triggered) { - write_to_net(this, vc, this->thread); + vc->net_write_io(this, this->thread); } else if (!vc->write.enabled) { write_ready_list.remove(vc); #if defined(solaris) @@ -428,7 +428,7 @@ NetHandler::process_ready_list() } #else /* !USE_EDGE_TRIGGER */ while ((vc = read_ready_list.dequeue())) { - set_cont_flags(vc->control_flags); + set_cont_flags(vc->get_control_flags()); if (vc->closed) free_netvc(vc); else if (vc->read.enabled && vc->read.triggered) @@ -437,7 +437,7 @@ NetHandler::process_ready_list() vc->ep.modify(-EVENTIO_READ); } while ((vc = write_ready_list.dequeue())) { - set_cont_flags(vc->control_flags); + set_cont_flags(vc->get_control_flags()); if (vc->closed) free_netvc(vc); else if (vc->write.enabled && vc->write.triggered) @@ -482,8 +482,8 @@ NetHandler::waitForActivity(ink_hrtime timeout) p->do_poll(timeout); // Get & Process polling result - PollDescriptor *pd = get_PollDescriptor(this->thread); - UnixNetVConnection *vc = nullptr; + PollDescriptor *pd = get_PollDescriptor(this->thread); + NetEventHandler *vc = nullptr; for (int x = 0; x < pd->result; x++) { epd = static_cast get_ev_data(pd, x); if (epd->type == EVENTIO_READWRITE_VC) { @@ -569,12 +569,12 @@ NetHandler::manage_active_queue(bool ignore_queue_size = false) ink_hrtime now = Thread::get_hrtime(); // loop over the non-active connections and try to close them - UnixNetVConnection *vc = active_queue.head; - UnixNetVConnection *vc_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; + NetEventHandler *vc = active_queue.head; + NetEventHandler *vc_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; for (; vc != nullptr; vc = vc_next) { vc_next = vc->active_queue_link.next; if ((vc->inactivity_timeout_in && vc->next_inactivity_timeout_at <= now) || @@ -619,12 +619,12 @@ NetHandler::manage_keep_alive_queue() } // loop over the non-active connections and try to close them - UnixNetVConnection *vc_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; - for (UnixNetVConnection *vc = keep_alive_queue.head; vc != nullptr; vc = vc_next) { + NetEventHandler *vc_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; + for (NetEventHandler *vc = keep_alive_queue.head; vc != nullptr; vc = vc_next) { vc_next = vc->keep_alive_queue_link.next; _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count); @@ -642,13 +642,13 @@ NetHandler::manage_keep_alive_queue() } void -NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, +NetHandler::_close_vc(NetEventHandler *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count) { - if (vc->thread != this_ethread()) { + if (vc->get_thread() != this_ethread()) { return; } - MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread()); + MUTEX_TRY_LOCK(lock, vc->get_mutex(), this_ethread()); if (!lock.is_locked()) { return; } @@ -671,11 +671,11 @@ NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, Event event; event.ethread = this_ethread(); if (vc->inactivity_timeout_in && vc->next_inactivity_timeout_at <= now) { - if (vc->handleEvent(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) { + if (vc->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) { ++handle_event; } } else if (vc->active_timeout_in && vc->next_activity_timeout_at <= now) { - if (vc->handleEvent(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) { + if (vc->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) { ++handle_event; } } @@ -683,7 +683,7 @@ NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, } void -NetHandler::add_to_keep_alive_queue(UnixNetVConnection *vc) +NetHandler::add_to_keep_alive_queue(NetEventHandler *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); @@ -703,7 +703,7 @@ NetHandler::add_to_keep_alive_queue(UnixNetVConnection *vc) } void -NetHandler::remove_from_keep_alive_queue(UnixNetVConnection *vc) +NetHandler::remove_from_keep_alive_queue(NetEventHandler *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); @@ -715,7 +715,7 @@ NetHandler::remove_from_keep_alive_queue(UnixNetVConnection *vc) } bool -NetHandler::add_to_active_queue(UnixNetVConnection *vc) +NetHandler::add_to_active_queue(NetEventHandler *vc) { Debug("net_queue", "NetVC: %p", vc); Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d", @@ -742,7 +742,7 @@ NetHandler::add_to_active_queue(UnixNetVConnection *vc) } void -NetHandler::remove_from_active_queue(UnixNetVConnection *vc) +NetHandler::remove_from_active_queue(NetEventHandler *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); diff --git a/iocore/net/UnixNetPages.cc b/iocore/net/UnixNetPages.cc index bd8d64002bd..fe7aa9651d7 100644 --- a/iocore/net/UnixNetPages.cc +++ b/iocore/net/UnixNetPages.cc @@ -61,7 +61,7 @@ struct ShowNet : public ShowCont { } ink_hrtime now = Thread::get_hrtime(); - forl_LL(UnixNetVConnection, vc, nh->open_list) + forl_LL(NetEventHandler, vc, nh->open_list) { // uint16_t port = ats_ip_port_host_order(&addr.sa); if (ats_is_ip(&addr) && !ats_ip_addr_port_eq(&addr.sa, vc->get_remote_addr())) { @@ -73,8 +73,8 @@ struct ShowNet : public ShowCont { ats_ip_ntop(vc->get_remote_addr(), ipbuf, sizeof(ipbuf)); char opt_ipbuf[INET6_ADDRSTRLEN]; char interbuf[80]; - snprintf(interbuf, sizeof(interbuf), "[%s] %s:%d", vc->options.toString(vc->options.addr_binding), - vc->options.local_ip.toString(opt_ipbuf, sizeof(opt_ipbuf)), vc->options.local_port); + snprintf(interbuf, sizeof(interbuf), "[%s] %s:%d", vc->get_options().toString(vc->get_options().addr_binding), + vc->get_options().local_ip.toString(opt_ipbuf, sizeof(opt_ipbuf)), vc->get_options().local_port); CHECK_SHOW(show("" //"%d" "%d" // ID @@ -96,7 +96,7 @@ struct ShowNet : public ShowCont { "%d" // shutdown "-%s" // comments "\n", - vc->id, ipbuf, ats_ip_port_host_order(vc->get_remote_addr()), vc->con.fd, interbuf, + vc->id, ipbuf, ats_ip_port_host_order(vc->get_remote_addr()), vc->get_fd(), interbuf, // vc->accept_port, (int)((now - vc->submit_time) / HRTIME_SECOND), ethread->id, vc->read.enabled, vc->read.vio.nbytes, vc->read.vio.ndone, vc->write.enabled, vc->write.vio.nbytes, vc->write.vio.ndone, @@ -158,7 +158,7 @@ struct ShowNet : public ShowCont { CHECK_SHOW(show("

Thread: %d

\n", ithread)); CHECK_SHOW(show("\n")); int connections = 0; - forl_LL(UnixNetVConnection, vc, nh->open_list) connections++; + forl_LL(NetEventHandler, vc, nh->open_list) connections++; CHECK_SHOW(show("\n", "Connections", connections)); // CHECK_SHOW(show("\n", "Last Poll Size", pollDescriptor->nfds)); CHECK_SHOW(show("\n", "Last Poll Ready", pollDescriptor->result)); diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index 448ae426f14..a97a845bcd3 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -866,8 +866,7 @@ UnixNetVConnection::reenable_re(VIO *vio) } } -UnixNetVConnection::UnixNetVConnection() : flags(0) - +UnixNetVConnection::UnixNetVConnection() { SET_HANDLER((NetVConnHandler)&UnixNetVConnection::startEvent); } @@ -891,6 +890,12 @@ UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) read_from_net(nh, this, lthread); } +void +UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread) +{ + write_to_net(nh, this, lthread); +} + // This code was pulled out of write_to_net so // I could overwrite it for the SSL implementation // (SSL read does not support overlapped i/o) From 8c756ff0bf7ffd0b7253e7cd4530c48354c04496 Mon Sep 17 00:00:00 2001 From: scw00 Date: Tue, 26 Nov 2019 15:54:13 +0800 Subject: [PATCH 02/12] rename NetEventHandler to NetEvent --- iocore/net/{NetEventHandler.h => NetEvent.h} | 26 +++---- iocore/net/P_UnixNet.h | 78 ++++++++++---------- iocore/net/P_UnixNetState.h | 6 +- iocore/net/P_UnixNetVConnection.h | 6 +- iocore/net/UnixNet.cc | 57 +++++++------- iocore/net/UnixNetPages.cc | 6 +- 6 files changed, 89 insertions(+), 90 deletions(-) rename iocore/net/{NetEventHandler.h => NetEvent.h} (83%) diff --git a/iocore/net/NetEventHandler.h b/iocore/net/NetEvent.h similarity index 83% rename from iocore/net/NetEventHandler.h rename to iocore/net/NetEvent.h index 7b74186dc7c..839dfe41787 100644 --- a/iocore/net/NetEventHandler.h +++ b/iocore/net/NetEvent.h @@ -27,14 +27,14 @@ class NetHandler; -// this class is used to NetHandler to hide some detail of NetEventHandler. -// To combine the `UDPConenction` and `NetEventHandler`. NetHandler should +// this class is used to NetHandler to hide some detail of NetEvent. +// To combine the `UDPConenction` and `NetEvent`. NetHandler should // callback to net_read_io or net_write_io when net event happen. -class NetEventHandler +class NetEvent { public: - NetEventHandler() = default; - virtual ~NetEventHandler() {} + NetEvent() = default; + virtual ~NetEvent() {} virtual void net_read_io(NetHandler *nh, EThread *lthread) = 0; virtual void net_write_io(NetHandler *nh, EThread *lthread) = 0; virtual void free(EThread *t) = 0; @@ -75,14 +75,14 @@ class NetEventHandler ink_hrtime next_activity_timeout_at = 0; ink_hrtime submit_time = 0; - LINK(NetEventHandler, open_link); - LINK(NetEventHandler, cop_link); - LINKM(NetEventHandler, read, ready_link) - SLINKM(NetEventHandler, read, enable_link) - LINKM(NetEventHandler, write, ready_link) - SLINKM(NetEventHandler, write, enable_link) - LINK(NetEventHandler, keep_alive_queue_link); - LINK(NetEventHandler, active_queue_link); + LINK(NetEvent, open_link); + LINK(NetEvent, cop_link); + LINKM(NetEvent, read, ready_link) + SLINKM(NetEvent, read, enable_link) + LINKM(NetEvent, write, ready_link) + SLINKM(NetEvent, write, enable_link) + LINK(NetEvent, keep_alive_queue_link); + LINK(NetEvent, active_queue_link); union { unsigned int flags = 0; diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index e477de77c4c..18802fa0349 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -75,7 +75,7 @@ struct PollDescriptor; typedef PollDescriptor *EventLoop; -class NetEventHandler; +class NetEvent; class UnixUDPConnection; struct DNSConnection; struct NetAccept; @@ -89,14 +89,14 @@ struct EventIO { int type = 0; union { Continuation *c; - NetEventHandler *vc; + NetEvent *vc; DNSConnection *dnscon; NetAccept *na; UnixUDPConnection *uc; } data; int start(EventLoop l, DNSConnection *vc, int events); int start(EventLoop l, NetAccept *vc, int events); - int start(EventLoop l, NetEventHandler *vc, int events); + int start(EventLoop l, NetEvent *vc, int events); int start(EventLoop l, UnixUDPConnection *vc, int events); int start(EventLoop l, int fd, Continuation *c, int events); // Change the existing events by adding modify(EVENTIO_READ) @@ -118,7 +118,7 @@ struct EventIO { #include "P_UnixPollDescriptor.h" #include -class NetEventHandler; +class NetEvent; class NetHandler; typedef int (NetHandler::*NetContHandler)(int, void *); typedef unsigned int uint32; @@ -236,15 +236,15 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler // If we don't get rid of @a trigger_event we should remove @a thread. EThread *thread = nullptr; Event *trigger_event = nullptr; - QueM(NetEventHandler, NetState, read, ready_link) read_ready_list; - QueM(NetEventHandler, NetState, write, ready_link) write_ready_list; - Que(NetEventHandler, open_link) open_list; - DList(NetEventHandler, cop_link) cop_list; - ASLLM(NetEventHandler, NetState, read, enable_link) read_enable_list; - ASLLM(NetEventHandler, NetState, write, enable_link) write_enable_list; - Que(NetEventHandler, keep_alive_queue_link) keep_alive_queue; + QueM(NetEvent, NetState, read, ready_link) read_ready_list; + QueM(NetEvent, NetState, write, ready_link) write_ready_list; + Que(NetEvent, open_link) open_list; + DList(NetEvent, cop_link) cop_list; + ASLLM(NetEvent, NetState, read, enable_link) read_enable_list; + ASLLM(NetEvent, NetState, write, enable_link) write_enable_list; + Que(NetEvent, keep_alive_queue_link) keep_alive_queue; uint32_t keep_alive_queue_size = 0; - Que(NetEventHandler, active_queue_link) active_queue; + Que(NetEvent, active_queue_link) active_queue; uint32_t active_queue_size = 0; /// configuration settings for managing the active and keep-alive queues @@ -293,10 +293,10 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler void process_ready_list(); void manage_keep_alive_queue(); bool manage_active_queue(bool ignore_queue_size); - void add_to_keep_alive_queue(NetEventHandler *vc); - void remove_from_keep_alive_queue(NetEventHandler *vc); - bool add_to_active_queue(NetEventHandler *vc); - void remove_from_active_queue(NetEventHandler *vc); + void add_to_keep_alive_queue(NetEvent *vc); + void remove_from_keep_alive_queue(NetEvent *vc); + bool add_to_active_queue(NetEvent *vc); + void remove_from_active_queue(NetEvent *vc); /// Per process initialization logic. static void init_for_process(); @@ -304,42 +304,42 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler void configure_per_thread_values(); /** - Start to handle read & write event on a NetEventHandler. + Start to handle read & write event on a NetEvent. Initial the socket fd of netvc for polling system. Only be called when holding the mutex of this NetHandler. - @param netvc NetEventHandler to be managed by this NetHandler. + @param netvc NetEvent to be managed by this NetHandler. @return 0 on success, netvc->nh set to this NetHandler. -ERRNO on failure. */ - int startIO(NetEventHandler *netvc); + int startIO(NetEvent *netvc); /** - Stop to handle read & write event on a NetEventHandler. + Stop to handle read & write event on a NetEvent. Remove the socket fd of netvc from polling system. Only be called when holding the mutex of this NetHandler and must call stopCop(netvc) first. - @param netvc NetEventHandler to be released. + @param netvc NetEvent to be released. @return netvc->nh set to nullptr. */ - void stopIO(NetEventHandler *netvc); + void stopIO(NetEvent *netvc); /** - Start to handle active timeout and inactivity timeout on a NetEventHandler. + Start to handle active timeout and inactivity timeout on a NetEvent. Put the netvc into open_list. All NetVCs in the open_list is checked for timeout by InactivityCop. Only be called when holding the mutex of this NetHandler and must call startIO(netvc) first. - @param netvc NetEventHandler to be managed by InactivityCop + @param netvc NetEvent to be managed by InactivityCop */ - void startCop(NetEventHandler *netvc); + void startCop(NetEvent *netvc); /** - Stop to handle active timeout and inactivity on a NetEventHandler. + Stop to handle active timeout and inactivity on a NetEvent. Remove the netvc from open_list and cop_list. Also remove the netvc from keep_alive_queue and active_queue if its context is IN. Only be called when holding the mutex of this NetHandler. - @param netvc NetEventHandler to be released. + @param netvc NetEvent to be released. */ - void stopCop(NetEventHandler *netvc); + void stopCop(NetEvent *netvc); // Signal the epoll_wait to terminate. void signalActivity() override; @@ -347,14 +347,14 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler /** Release a netvc and free it. - @param netvc NetEventHandler to be detached. + @param netvc NetEvent to be detached. */ - void free_netvc(NetEventHandler *netvc); + void free_netvc(NetEvent *netvc); NetHandler(); private: - void _close_vc(NetEventHandler *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count); + void _close_vc(NetEvent *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count); /// Static method used as the callback for runtime configuration updates. static int update_nethandler_config(const char *name, RecDataT, RecData data, void *); @@ -506,10 +506,10 @@ check_transient_accept_error(int res) } // -// Disable a NetEventHandler +// Disable a NetEvent // static inline void -read_disable(NetHandler *nh, NetEventHandler *vc) +read_disable(NetHandler *nh, NetEvent *vc) { if (!vc->write.enabled) { vc->set_inactivity_timeout(0); @@ -521,7 +521,7 @@ read_disable(NetHandler *nh, NetEventHandler *vc) } static inline void -write_disable(NetHandler *nh, NetEventHandler *vc) +write_disable(NetHandler *nh, NetEvent *vc) { if (!vc->read.enabled) { vc->set_inactivity_timeout(0); @@ -545,7 +545,7 @@ EventIO::start(EventLoop l, NetAccept *vc, int events) return start(l, vc->server.fd, (Continuation *)vc, events); } TS_INLINE int -EventIO::start(EventLoop l, NetEventHandler *vc, int events) +EventIO::start(EventLoop l, NetEvent *vc, int events) { type = EVENTIO_READWRITE_VC; return start(l, vc->get_fd(), (Continuation *)vc, events); @@ -774,7 +774,7 @@ EventIO::stop() } TS_INLINE int -NetHandler::startIO(NetEventHandler *netvc) +NetHandler::startIO(NetEvent *netvc) { ink_assert(this->mutex->thread_holding == this_ethread()); ink_assert(netvc->get_thread() == this_ethread()); @@ -798,7 +798,7 @@ NetHandler::startIO(NetEventHandler *netvc) } TS_INLINE void -NetHandler::stopIO(NetEventHandler *netvc) +NetHandler::stopIO(NetEvent *netvc) { ink_release_assert(netvc->nh == this); @@ -819,7 +819,7 @@ NetHandler::stopIO(NetEventHandler *netvc) } TS_INLINE void -NetHandler::startCop(NetEventHandler *netvc) +NetHandler::startCop(NetEvent *netvc) { ink_assert(this->mutex->thread_holding == this_ethread()); ink_release_assert(netvc->nh == this); @@ -829,7 +829,7 @@ NetHandler::startCop(NetEventHandler *netvc) } TS_INLINE void -NetHandler::stopCop(NetEventHandler *netvc) +NetHandler::stopCop(NetEvent *netvc) { ink_release_assert(netvc->nh == this); diff --git a/iocore/net/P_UnixNetState.h b/iocore/net/P_UnixNetState.h index 8bdd7d7b33d..46fddbc7164 100644 --- a/iocore/net/P_UnixNetState.h +++ b/iocore/net/P_UnixNetState.h @@ -40,13 +40,13 @@ #include "I_VIO.h" class Event; -class NetEventHandler; +class NetEvent; struct NetState { int enabled = 0; VIO vio; - Link ready_link; - SLink enable_link; + Link ready_link; + SLink enable_link; int in_enabled_list = 0; int triggered = 0; diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index 06b69b1e6af..6297100bd4e 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -36,7 +36,7 @@ #include "P_UnixNetState.h" #include "P_Connection.h" #include "P_NetAccept.h" -#include "NetEventHandler.h" +#include "NetEvent.h" class UnixNetVConnection; class NetHandler; @@ -105,7 +105,7 @@ struct OOB_callback : public Continuation { enum tcp_congestion_control_t { CLIENT_SIDE, SERVER_SIDE }; -class UnixNetVConnection : public NetVConnection, public NetEventHandler +class UnixNetVConnection : public NetVConnection, public NetEvent { public: int64_t outstanding() override; @@ -217,7 +217,7 @@ class UnixNetVConnection : public NetVConnection, public NetEventHandler return false; } - // NetEventHandler + // NetEvent virtual void net_read_io(NetHandler *nh, EThread *lthread) override; virtual void net_write_io(NetHandler *nh, EThread *lthread) override; virtual void free(EThread *t) override; diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index ec6fad08b43..39b8bedf3b2 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -56,7 +56,7 @@ class InactivityCop : public Continuation Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id); // The rest NetVCs in cop_list which are not triggered between InactivityCop runs. // Use pop() to catch any closes caused by callbacks. - while (NetEventHandler *vc = nh.cop_list.pop()) { + while (NetEvent *vc = nh.cop_list.pop()) { // If we cannot get the lock don't stop just keep cleaning MUTEX_TRY_LOCK(lock, vc->get_mutex(), this_ethread()); if (!lock.is_locked()) { @@ -87,7 +87,7 @@ class InactivityCop : public Continuation } // The cop_list is empty now. // Let's reload the cop_list from open_list again. - forl_LL(NetEventHandler, vc, nh.open_list) + forl_LL(NetEvent, vc, nh.open_list) { if (vc->get_thread() == this_ethread()) { nh.cop_list.push(vc); @@ -333,10 +333,10 @@ NetHandler::init_for_process() } // -// Function used to release a NetEventHandler and free it. +// Function used to release a NetEvent and free it. // void -NetHandler::free_netvc(NetEventHandler *netvc) +NetHandler::free_netvc(NetEvent *netvc) { EThread *t = this->thread; @@ -358,9 +358,9 @@ NetHandler::free_netvc(NetEventHandler *netvc) void NetHandler::process_enabled_list() { - NetEventHandler *vc = nullptr; + NetEvent *vc = nullptr; - SListM(NetEventHandler, NetState, read, enable_link) rq(read_enable_list.popall()); + SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall()); while ((vc = rq.pop())) { vc->ep.modify(EVENTIO_READ); vc->ep.refresh(EVENTIO_READ); @@ -370,7 +370,7 @@ NetHandler::process_enabled_list() } } - SListM(NetEventHandler, NetState, write, enable_link) wq(write_enable_list.popall()); + SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall()); while ((vc = wq.pop())) { vc->ep.modify(EVENTIO_WRITE); vc->ep.refresh(EVENTIO_WRITE); @@ -387,10 +387,10 @@ NetHandler::process_enabled_list() void NetHandler::process_ready_list() { - NetEventHandler *vc = nullptr; + NetEvent *vc = nullptr; #if defined(USE_EDGE_TRIGGER) - // NetEventHandler * + // NetEvent * while ((vc = read_ready_list.dequeue())) { // Initialize the thread-local continuation flags set_cont_flags(vc->get_control_flags()); @@ -482,8 +482,8 @@ NetHandler::waitForActivity(ink_hrtime timeout) p->do_poll(timeout); // Get & Process polling result - PollDescriptor *pd = get_PollDescriptor(this->thread); - NetEventHandler *vc = nullptr; + PollDescriptor *pd = get_PollDescriptor(this->thread); + NetEvent *vc = nullptr; for (int x = 0; x < pd->result; x++) { epd = static_cast get_ev_data(pd, x); if (epd->type == EVENTIO_READWRITE_VC) { @@ -569,12 +569,12 @@ NetHandler::manage_active_queue(bool ignore_queue_size = false) ink_hrtime now = Thread::get_hrtime(); // loop over the non-active connections and try to close them - NetEventHandler *vc = active_queue.head; - NetEventHandler *vc_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; + NetEvent *vc = active_queue.head; + NetEvent *vc_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; for (; vc != nullptr; vc = vc_next) { vc_next = vc->active_queue_link.next; if ((vc->inactivity_timeout_in && vc->next_inactivity_timeout_at <= now) || @@ -619,12 +619,12 @@ NetHandler::manage_keep_alive_queue() } // loop over the non-active connections and try to close them - NetEventHandler *vc_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; - for (NetEventHandler *vc = keep_alive_queue.head; vc != nullptr; vc = vc_next) { + NetEvent *vc_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; + for (NetEvent *vc = keep_alive_queue.head; vc != nullptr; vc = vc_next) { vc_next = vc->keep_alive_queue_link.next; _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count); @@ -642,8 +642,7 @@ NetHandler::manage_keep_alive_queue() } void -NetHandler::_close_vc(NetEventHandler *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, - int &total_idle_count) +NetHandler::_close_vc(NetEvent *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count) { if (vc->get_thread() != this_ethread()) { return; @@ -683,7 +682,7 @@ NetHandler::_close_vc(NetEventHandler *vc, ink_hrtime now, int &handle_event, in } void -NetHandler::add_to_keep_alive_queue(NetEventHandler *vc) +NetHandler::add_to_keep_alive_queue(NetEvent *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); @@ -703,7 +702,7 @@ NetHandler::add_to_keep_alive_queue(NetEventHandler *vc) } void -NetHandler::remove_from_keep_alive_queue(NetEventHandler *vc) +NetHandler::remove_from_keep_alive_queue(NetEvent *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); @@ -715,7 +714,7 @@ NetHandler::remove_from_keep_alive_queue(NetEventHandler *vc) } bool -NetHandler::add_to_active_queue(NetEventHandler *vc) +NetHandler::add_to_active_queue(NetEvent *vc) { Debug("net_queue", "NetVC: %p", vc); Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d", @@ -742,7 +741,7 @@ NetHandler::add_to_active_queue(NetEventHandler *vc) } void -NetHandler::remove_from_active_queue(NetEventHandler *vc) +NetHandler::remove_from_active_queue(NetEvent *vc) { Debug("net_queue", "NetVC: %p", vc); ink_assert(mutex->thread_holding == this_ethread()); diff --git a/iocore/net/UnixNetPages.cc b/iocore/net/UnixNetPages.cc index fe7aa9651d7..bb8f7bb4b98 100644 --- a/iocore/net/UnixNetPages.cc +++ b/iocore/net/UnixNetPages.cc @@ -27,7 +27,7 @@ #include "I_Tasks.h" struct ShowNet; -using ShowNetEventHandler = int (ShowNet::*)(int, Event *); +using ShowNetEvent = int (ShowNet::*)(int, Event *); struct ShowNet : public ShowCont { int ithread; IpEndpoint addr; @@ -61,7 +61,7 @@ struct ShowNet : public ShowCont { } ink_hrtime now = Thread::get_hrtime(); - forl_LL(NetEventHandler, vc, nh->open_list) + forl_LL(NetEvent, vc, nh->open_list) { // uint16_t port = ats_ip_port_host_order(&addr.sa); if (ats_is_ip(&addr) && !ats_ip_addr_port_eq(&addr.sa, vc->get_remote_addr())) { @@ -158,7 +158,7 @@ struct ShowNet : public ShowCont { CHECK_SHOW(show("

Thread: %d

\n", ithread)); CHECK_SHOW(show("
%s%d
%s%d
%s%d
\n")); int connections = 0; - forl_LL(NetEventHandler, vc, nh->open_list) connections++; + forl_LL(NetEvent, vc, nh->open_list) connections++; CHECK_SHOW(show("\n", "Connections", connections)); // CHECK_SHOW(show("\n", "Last Poll Size", pollDescriptor->nfds)); CHECK_SHOW(show("\n", "Last Poll Ready", pollDescriptor->result)); From 4a5094e11465d2cffb263bead037a182f09323f8 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 16 Dec 2019 10:09:35 +0800 Subject: [PATCH 03/12] Add udp2 implementation --- iocore/net/Makefile.am | 70 +- iocore/net/UDPConnection.cc | 1072 ++++++++++++++++++++++++++++++ iocore/net/UDPConnection.h | 172 +++++ iocore/net/UDPPacket.h | 52 ++ iocore/net/UDPProcessor.cc | 95 +++ iocore/net/UDPProcessor.h | 33 + iocore/net/libinknet_stub.cc | 2 +- iocore/net/test_UDPAcceptEcho.cc | 335 ++++++++++ iocore/net/test_UDPEcho.cc | 262 ++++++++ 9 files changed, 2090 insertions(+), 3 deletions(-) create mode 100644 iocore/net/UDPConnection.cc create mode 100644 iocore/net/UDPConnection.h create mode 100644 iocore/net/UDPPacket.h create mode 100644 iocore/net/UDPProcessor.cc create mode 100644 iocore/net/UDPProcessor.h create mode 100644 iocore/net/test_UDPAcceptEcho.cc create mode 100644 iocore/net/test_UDPEcho.cc diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index 2ec6f4e668c..dca85a11b27 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -37,7 +37,7 @@ AM_CPPFLAGS += \ TESTS = $(check_PROGRAMS) -check_PROGRAMS = test_certlookup test_UDPNet +check_PROGRAMS = test_certlookup test_UDPNet test_UDPEcho test_UDPAcceptEcho noinst_LIBRARIES = libinknet.a test_certlookup_LDFLAGS = \ @@ -163,7 +163,72 @@ libinknet_a_SOURCES = \ UnixNetVConnection.cc \ UnixUDPConnection.cc \ UnixUDPNet.cc \ - SSLDynlock.cc + SSLDynlock.cc \ + UDPProcessor.cc \ + UDPConnection.cc + +test_UDPEcho_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(iocore_include_dirs) \ + -I$(abs_top_srcdir)/proxy \ + -I$(abs_top_srcdir)/proxy/hdrs \ + -I$(abs_top_srcdir)/proxy/http \ + -I$(abs_top_srcdir)/proxy/logging \ + -I$(abs_top_srcdir)/mgmt \ + -I$(abs_top_srcdir)/mgmt/utils \ + @OPENSSL_INCLUDES@ + +test_UDPEcho_LDFLAGS = \ + @AM_LDFLAGS@ \ + @OPENSSL_LDFLAGS@ \ + @YAMLCPP_LDFLAGS@ + +test_UDPEcho_LDADD = \ + libinknet.a \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/src/tscore/libtscore.la $(top_builddir)/src/tscpp/util/libtscpputil.la \ + $(top_builddir)/proxy/ParentSelectionStrategy.o \ + @HWLOC_LIBS@ @OPENSSL_LIBS@ @LIBPCRE@ @YAMLCPP_LIBS@ + +test_UDPEcho_SOURCES = \ + libinknet_stub.cc \ + UnixUDPConnection.cc \ + UnixUDPNet.cc \ + test_UDPEcho.cc + +test_UDPAcceptEcho_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(iocore_include_dirs) \ + -I$(abs_top_srcdir)/proxy \ + -I$(abs_top_srcdir)/proxy/hdrs \ + -I$(abs_top_srcdir)/proxy/http \ + -I$(abs_top_srcdir)/proxy/logging \ + -I$(abs_top_srcdir)/mgmt \ + -I$(abs_top_srcdir)/mgmt/utils \ + @OPENSSL_INCLUDES@ + +test_UDPAcceptEcho_LDFLAGS = \ + @AM_LDFLAGS@ \ + @OPENSSL_LDFLAGS@ \ + @YAMLCPP_LDFLAGS@ + +test_UDPAcceptEcho_LDADD = \ + libinknet.a \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/src/tscore/libtscore.la $(top_builddir)/src/tscpp/util/libtscpputil.la \ + $(top_builddir)/proxy/ParentSelectionStrategy.o \ + @HWLOC_LIBS@ @OPENSSL_LIBS@ @LIBPCRE@ @YAMLCPP_LIBS@ + +test_UDPAcceptEcho_SOURCES = \ + libinknet_stub.cc \ + UnixUDPConnection.cc \ + UnixUDPNet.cc \ + test_UDPAcceptEcho.cc + if ENABLE_QUIC libinknet_a_SOURCES += \ @@ -192,3 +257,4 @@ include $(top_srcdir)/build/tidy.mk clang-tidy-local: $(DIST_SOURCES) $(CXX_Clang_Tidy) + diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc new file mode 100644 index 00000000000..1f2f8930725 --- /dev/null +++ b/iocore/net/UDPConnection.cc @@ -0,0 +1,1072 @@ +/** @file + + ALPNSupport.cc provides implmentations for ALPNSupport methods + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UDPConnection.h" + +static const char * +udp_event_name(UDP2ConnectionImpl::UDPEvents e) +{ + switch (e) { + case UDP2ConnectionImpl::UDPEvents::UDP_CONNECT_EVENT: + return "UDP_CONNECT_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_SEND_EVENT: + return "UDP_SEND_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_USER_READ_READY: + return "UDP_USER_READ_READY"; + case UDP2ConnectionImpl::UDPEvents::UDP_CLOSE_EVENT: + return "UDP_CLOSE_EVENT"; + default: + return "UNKNOWN EVENT"; + }; + + return nullptr; +} + +// +// Reschedule a NetEvent by moving it +// onto or off of the ready_list +// +static inline void +read_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_READ); + if (vc->read.triggered && vc->read.enabled) { + nh->read_ready_list.in_or_enqueue(vc); + } else { + nh->read_ready_list.remove(vc); + } +} + +static inline void +write_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_WRITE); + if (vc->write.triggered && vc->write.enabled) { + nh->write_ready_list.in_or_enqueue(vc); + } else { + nh->write_ready_list.remove(vc); + } +} + +// +// UDP2ConnectionImpl +// +UDP2ConnectionImpl::UDP2ConnectionImpl(Continuation *con, EThread *thread) : _con(con), _thread(thread) +{ + this->mutex = con->mutex; + this->read.enabled = 1; // read enabled is always true because we expected all data; + if (thread == nullptr) { + this->_thread = this_ethread(); + } + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } + this->refcount_inc(); + SET_HANDLER(&UDP2ConnectionImpl::startEvent); +} + +UDP2ConnectionImpl::UDP2ConnectionImpl(AcceptUDP2ConnectionImpl *accept, Continuation *con, EThread *ethread) + : _con(con), _thread(ethread), _accept_con(accept) +{ + this->mutex = con->mutex; + this->read.enabled = 1; // read enabled is always true because we expected all data; + if (ethread == nullptr) { + this->_thread = this_ethread(); + } + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } + this->refcount_inc(); + SET_HANDLER(&UDP2ConnectionImpl::startEvent); +} + +UDP2ConnectionImpl::~UDP2ConnectionImpl() +{ + ink_assert(this->mutex->thread_holding == this_thread()); + Debug("udp_con", "connection close"); + this->mutex = nullptr; + + int fd = this->_fd; + + if (this->nh != nullptr) { + this->nh->stopIO(this); + } + + this->_fd = -1; + if (fd != -1) { + ::close(fd); + } + + SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); + UDP2Packet *p; + while ((p = aq.pop())) { + delete p; + } + + SList(UDP2Packet, in_link) aq2(this->_recv_queue.popall()); + while ((p = aq2.pop())) { + delete p; + } +} + +void +UDP2ConnectionImpl::free(EThread *t) +{ + // should never be called; + ink_release_assert(0); +} + +int +UDP2ConnectionImpl::callback(int event, void *data) +{ + if (this->_con == nullptr) { + return 0; + } + + MUTEX_TRY_LOCK(lock, this->_con->mutex == nullptr ? this->mutex : this->_con->mutex, this_ethread()); + if (!lock.is_locked()) { + // TODO reuse cached event + Debug("udpcon", "callback get con lock failed"); + this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr); + return 0; + } + return this->_con->handleEvent(event, data); +} + +void +UDP2ConnectionImpl::set_inactivity_timeout(ink_hrtime timeout_in) +{ +} + +EThread * +UDP2ConnectionImpl::get_thread() +{ + return this->_thread; +} + +int +UDP2ConnectionImpl::close() +{ + // detach contiuation. we should not callback to con after `close` has been called + this->_con = nullptr; + this->mutex = this->_thread->mutex; + + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + UDP2Packet *p; + while ((p = aq.pop())) { + delete p; + } + + ink_assert(this->refcount() > 0); + // if we have something to send or have events out, waiting ... + Debug("udp_conn", "connection close, refcount %d", this->refcount()); + if ((this->_send_queue.empty() && this->_send_list.empty()) && this->refcount() == 1) { + if (this->refcount_dec() == 0) { + if (this->_accept_con != nullptr) { + this->_accept_con->close_connection(this); + } else { + delete this; + } + } else { + // nothing to send, enter end state. + SET_HANDLER(&UDP2ConnectionImpl::endEvent); + } + } else { + // have something to send, waiting for sending completely. + // SET_HANDLER(&UDP2ConnectionImpl::endEvent); + this->_reschedule(UDPEvents::UDP_SEND_EVENT, nullptr); + } + + return 0; +} + +int +UDP2ConnectionImpl::get_fd() +{ + return this->_fd; +} + +Ptr & +UDP2ConnectionImpl::get_mutex() +{ + return this->mutex; +} + +ContFlags & +UDP2ConnectionImpl::get_control_flags() +{ + return _cont_flags; +} + +sockaddr const * +UDP2ConnectionImpl::get_remote_addr() +{ + return &_to.sa; +} + +const NetVCOptions & +UDP2ConnectionImpl::get_options() +{ + return _options; +} + +bool +UDP2ConnectionImpl::_is_closed() const +{ + return this->_con == nullptr; +} + +int +UDP2ConnectionImpl::mainEvent(int event, void *data) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + Debug("udp_conn", "mainEvent refcount: %d", this->refcount()); + ink_assert(this->refcount_dec() > 0); + switch (static_cast(event)) { + case UDPEvents::UDP_CONNECT_EVENT: + ink_assert(data != nullptr); + this->connect(static_cast(data)); + break; + case UDPEvents::UDP_SEND_EVENT: + this->write.triggered = 1; + this->_reenable(&this->write.vio); + break; + case UDPEvents::UDP_USER_READ_READY: + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + break; + case UDPEvents::UDP_CLOSE_EVENT: + this->_process_close_connection(static_cast(data)); + break; + default: + Debug("udp_con", "unknown events: %d", event); + ink_release_assert(0); + break; + } + + if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + SET_HANDLER(&UDP2ConnectionImpl::endEvent); + this->handleEvent(0, nullptr); + } + + return 0; +} + +int +UDP2ConnectionImpl::startEvent(int event, void *data) +{ + // ink_assert(this->mutex->thread_holding == this->_thread); + ink_assert(this->refcount_dec() > 0); + NetHandler *nh = get_NetHandler(this->_thread); + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + this->refcount_inc(); + SET_HANDLER(&UDP2ConnectionImpl::startEvent); + this->_thread->schedule_in(this, net_retry_delay); + return 1; + } + + Debug("udp_conn", "startEvent complete refcount: %d", this->refcount()); + SET_HANDLER(&UDP2ConnectionImpl::mainEvent); + ink_assert(nh->startIO(this) >= 0); + return 0; +} + +int +UDP2ConnectionImpl::endEvent(int event, void *data) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + ink_assert(this->refcount() > 0); + Debug("udp_conn", "endEvent refcount: %d", this->refcount()); + if (this->refcount_dec() != 0) { + return 0; + } + + MUTEX_TRY_LOCK(lock, this->nh->mutex, this_ethread()); + if (!lock.is_locked()) { + this->_reschedule(UDPEvents::UDP_SEND_EVENT, nullptr); + return 0; + } + + // kick out netevent from NetHandler + this->nh->stopIO(this); + if (this->_accept_con != nullptr) { + this->_accept_con->close_connection(this); + } else { + delete this; + } + + return 0; +} + +int +UDP2ConnectionImpl::start_io() +{ + this->refcount_inc(); + return this->startEvent(0, nullptr); +} + +int +UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_buf) +{ + int res = 0; + int fd = -1; + IpEndpoint local_addr{}; + int local_addr_len = sizeof(local_addr); + if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) { + goto Lerror; + } + + fd = res; + if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0) { + goto Lerror; + } + + if (recv_buf > 0) { + if (unlikely(socketManager.set_rcvbuf_size(fd, recv_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", recv_buf); + } + } + if (send_buf > 0) { + if (unlikely(socketManager.set_sndbuf_size(fd, send_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", send_buf); + } + } + + if (addr->sa_family == AF_INET) { + bool succeeded = false; + int enable = 1; +#ifdef IP_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IP_RECVDSTADDR + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } else if (addr->sa_family == AF_INET6) { + bool succeeded = false; + int enable = 1; +#ifdef IPV6_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IPV6_RECVPKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } + + // If this is a class D address (i.e. multicast address), use REUSEADDR. + if (ats_is_ip_multicast(addr)) { + int enable_reuseaddr = 1; + + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&enable_reuseaddr), + sizeof(enable_reuseaddr)) < 0)) { + goto Lerror; + } + } + + if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + goto Lerror; + } + + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int))) < 0) { + goto Lerror; + } + + if (-1 == socketManager.ink_bind(fd, addr, ats_ip_size(addr))) { + char buff[INET6_ADDRPORTSTRLEN]; + Debug("udp_con", "ink bind failed on %s %s", ats_ip_nptop(addr, buff, sizeof(buff)), strerror(errno)); + goto Lerror; + } + + if ((res = safe_getsockname(fd, &local_addr.sa, &local_addr_len)) < 0) { + Debug("udp_con", "CreateUdpsocket: getsockname didn't work"); + goto Lerror; + } + + ats_ip_copy(&this->_from, &local_addr.sa); + this->_fd = fd; + Debug("udp_con", "creating a udp socket port = %d---success", ats_ip_port_host_order(local_addr)); + return 0; +Lerror: + Debug("udp_con", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr)); + if (fd != -1) { + socketManager.close(fd); + } + + return -errno; +} + +IpEndpoint +UDP2ConnectionImpl::from() +{ + return this->_from; +} + +IpEndpoint +UDP2ConnectionImpl::to() +{ + return this->_to; +} + +int +UDP2ConnectionImpl::_connect(sockaddr const *addr) +{ + ink_assert(this->_fd != NO_FD); + int res = ::connect(this->_fd, addr, ats_ip_size(addr)); + if (res >= 0) { + ats_ip_copy(&_to, addr); + return 0; + } + + return -errno; +} + +int +UDP2ConnectionImpl::connect(sockaddr const *addr) +{ + int res = this->_connect(addr); + if (res < 0) { + if ((res == -EINPROGRESS) || (res == -EWOULDBLOCK)) { + this->_reschedule(UDPEvents::UDP_CONNECT_EVENT, const_cast(addr)); + return 0; + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_ERROR, this); + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_SUCCESS, this); +} + +bool +UDP2ConnectionImpl::is_connected() +{ + return this->_to.port() != 0; +} + +void +UDP2ConnectionImpl::set_continuation(Continuation *con) +{ + // ink_assert(this->mutex == nullptr); + // rebind mutex; + this->_con = con; + this->mutex = con->mutex; + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } +} + +void +UDP2ConnectionImpl::bind_thread(EThread *thread) +{ + this->_thread = thread; +} + +void +UDP2ConnectionImpl::_reschedule(UDPEvents e, void *data) +{ + this->refcount_inc(); + Debug("udp_con", "schedule event %s refcount: %d", udp_event_name(e), this->refcount()); + this->_thread->schedule_imm(this, static_cast(e), data); +} + +void +UDP2ConnectionImpl::_process_recv_event() +{ + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); +} + +void +UDP2ConnectionImpl::net_read_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->read; + if (!s->enabled) { + read_disable(nh, this); + return; + } + + // receive packet and queue onto UDPConnection. + // don't call back connection at this time. + int64_t r; + int count = 0; + + struct msghdr msg; + Ptr chain, next_chain; + struct iovec tiovec[MAX_NIOV]; + int64_t size_index = BUFFER_SIZE_INDEX_2K; + int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index); + // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes. + // Because the 'UDP Length' is type of uint16_t defined in RFC 768. + // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. + do { + // create IOBufferBlock chain to receive data + unsigned int niov; + IOBufferBlock *b, *last; + + // build struct iov + // reuse the block in chain if available + b = chain.get(); + last = nullptr; + for (niov = 0; niov < MAX_NIOV; niov++) { + if (b == nullptr) { + b = new_IOBufferBlock(); + b->alloc(size_index); + if (last == nullptr) { + chain = b; + } else { + last->next = b; + } + } + + tiovec[niov].iov_base = b->buf(); + tiovec[niov].iov_len = b->block_size(); + + last = b; + b = b->next.get(); + } + + // build struct msghdr + sockaddr_in6 fromaddr; + sockaddr_in6 toaddr; + int toaddr_len = sizeof(toaddr); + char *cbuf[1024]; + msg.msg_name = &fromaddr; + msg.msg_namelen = sizeof(fromaddr); + msg.msg_iov = tiovec; + msg.msg_iovlen = niov; + msg.msg_control = cbuf; + msg.msg_controllen = sizeof(cbuf); + + // receive data by recvmsg + r = socketManager.recvmsg(this->get_fd(), &msg, 0); + if (r <= 0) { + if (r == -EAGAIN || r == -ENOTCONN) { + this->read.triggered = 0; + read_reschedule(nh, this); + break; + } + this->callback(NET_EVENT_DATAGRAM_READ_ERROR, this); + return; + } + + // truncated check + if (msg.msg_flags & MSG_TRUNC) { + Debug("udp-read", "The UDP packet is truncated"); + ink_assert(!"truncate should not happen, if so please increase MAX_NIOV"); + this->callback(NET_EVENT_DATAGRAM_READ_ERROR, this); + return; + } + + // fill the IOBufferBlock chain + int64_t saved = r; + b = chain.get(); + while (b && saved > 0) { + if (saved > buffer_size) { + b->fill(buffer_size); + saved -= buffer_size; + b = b->next.get(); + } else { + b->fill(saved); + saved = 0; + next_chain = b->next.get(); + b->next = nullptr; + } + } + + safe_getsockname(this->get_fd(), reinterpret_cast(&toaddr), &toaddr_len); + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + switch (cmsg->cmsg_type) { +#ifdef IP_PKTINFO + case IP_PKTINFO: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; + } + break; +#endif +#ifdef IP_RECVDSTADDR + case IP_RECVDSTADDR: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_addr *addr = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = addr->s_addr; + } + break; +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too + if (cmsg->cmsg_level == IPPROTO_IPV6) { + struct in6_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + break; +#endif + } + } + + // queue onto the UDPConnection + this->_recv_queue.push(new UDP2Packet(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain)); + + // reload the unused block + chain = next_chain; + next_chain = nullptr; + count++; + } while (r > 0); + + if (count) { + this->_process_recv_event(); + } + read_reschedule(nh, this); + return; +} + +void +UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + write_reschedule(nh, this); + return; + } + + MUTEX_TRY_LOCK(lock2, this->nh->mutex, thread); + if (!lock2.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->write; + if (!s->enabled) { + write_disable(nh, this); + return; + } + + SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); + UDP2Packet *p; + while ((p = aq.pop())) { + this->_send_list.push_back(UDP2PacketUPtr(p)); + } + + int count = 0; + while (!this->_send_list.empty()) { + auto p = std::move(this->_send_list.front()); + this->_send_list.pop_front(); + + int rc = 0; + if (this->is_connected()) { + rc = this->_send(p.get()); + } else { + ink_assert(p->to.port() != 0); + rc = this->_send_to(p.get()); + } + + if (rc >= 0) { + count++; + continue; + } + + if (errno != EAGAIN) { + this->write.triggered = 0; + write_reschedule(nh, this); + break; + } else { + this->write.triggered = 0; + this->callback(NET_EVENT_DATAGRAM_WRITE_ERROR, this); + return; + } + } + + if (count > 0) { + this->callback(NET_EVENT_DATAGRAM_WRITE_READY, this); + } + + return; +} + +int +UDP2ConnectionImpl::_send(UDP2Packet *p) +{ + ink_assert(this->is_connected()); + struct iovec iov[MAX_NIOV]; + int n, iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + iov_len++; + } + + n = socketManager.writev(this->_fd, iov, iov_len); + if (n >= 0) { + return n; + } + + Debug("udp_con", "writev failed: %s", strerror(errno)); + return -errno; +} + +int +UDP2ConnectionImpl::_send_to(UDP2Packet *p) +{ + ink_assert(this->is_connected() == false); + struct msghdr msg; + struct iovec iov[MAX_NIOV]; + int real_len = 0; + int n, iov_len = 0; + +#if !defined(solaris) + msg.msg_control = nullptr; + msg.msg_controllen = 0; + msg.msg_flags = 0; +#endif + msg.msg_name = reinterpret_cast(&p->to.sa); + msg.msg_namelen = ats_ip_size(p->to); + iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + real_len += iov[iov_len].iov_len; + iov_len++; + } + + msg.msg_iov = iov; + msg.msg_iovlen = iov_len; + + n = socketManager.sendmsg(this->_fd, &msg, 0); + if (n >= 0) { + return n; + } + + Debug("udp_conn", "send from external thread failed: %d-%s", errno, strerror(errno)); + return -errno; +} + +UDP2Packet * +UDP2ConnectionImpl::recv() +{ + // user should call recv immediatly when UDP2Connection callback to + // contiuation. Since the mutex is already grabed from eventsystem or + // NetHandler, we don't need explicit take lock here. + ink_assert(!this->_is_closed()); + ink_assert(this->mutex->thread_holding == this->_thread); + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + UDP2Packet *t; + while ((t = aq.pop())) { + this->_recv_list.push_back(UDP2PacketUPtr(t)); + } + + if (this->_recv_list.empty()) { + return nullptr; + } + + auto p = std::move(this->_recv_list.front()); + this->_recv_list.pop_front(); + auto ret = p.get(); + p.release(); + return ret; +} + +void +UDP2ConnectionImpl::_reenable(VIO *vio) +{ + NetState *state = &this->write; + if (vio != &this->write.vio) { + state = &this->read; + } + + if (state->enabled) { + return; + } + + state->enabled = 1; + EThread *t = this->mutex->thread_holding; + ink_assert(t == this_ethread()); + ink_release_assert(!closed); + if (nh->mutex->thread_holding == t) { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } else { + MUTEX_TRY_LOCK(lock, nh->mutex, t); + if (!lock.is_locked()) { + if (vio == &read.vio) { + int isin = ink_atomic_swap(&read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(this); + } + } else { + int isin = ink_atomic_swap(&write.in_enabled_list, 1); + if (!isin) { + nh->write_enable_list.push(this); + } + } + if (likely(nh->thread)) { + nh->thread->tail_cb->signalActivity(); + } else if (nh->trigger_event) { + nh->trigger_event->ethread->tail_cb->signalActivity(); + } + } else { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } + } +} + +int +UDP2ConnectionImpl::receive(UDP2Packet *packet) +{ + if (this->_is_closed()) { + return 0; + } + + this->_recv_queue.push(packet); + if (this->_thread == this_ethread()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, nullptr); + return 0; + } + + this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr); + return 0; +} + +int +UDP2ConnectionImpl::send(UDP2Packet *p) +{ + ink_assert(!this->_is_closed()); + ink_assert(this->is_connected() || p->to.isValid()); + this->_send_queue.push(p); + if (this->_thread == this_thread()) { + // in local thread; + this->_reenable(&this->write.vio); + } else { + // cross thread + this->_reschedule(UDPEvents::UDP_SEND_EVENT, nullptr); + } + this->nh->signalActivity(); + return 0; +} + +void +UDP2ConnectionImpl::_process_close_connection(UDP2ConnectionImpl *con) +{ + ink_release_assert(!"never be called"); +} + +// +// AcceptUDP2ConnectionImpl +// +uint64_t +hash_code(const IpEndpoint &peer) +{ + return ats_ip_hash(&peer.sa) ^ ats_ip_port_hash(&peer.sa); +} + +UDP2ConnectionImpl * +AcceptUDP2ConnectionImpl::find_connection(const IpEndpoint &peer) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + uint64_t hash = hash_code(peer); + auto &map = this->_connection_map; + auto it = map.find(hash); + if (it == map.end()) { + return nullptr; + } + + for (auto tt : it->second) { + if (ats_ip_addr_port_eq(peer, tt->to())) { + return tt; + } + } + + return nullptr; +} + +UDP2ConnectionImpl * +AcceptUDP2ConnectionImpl::create_connection(const IpEndpoint &local, const IpEndpoint &peer, Continuation *c, EThread *thread) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + ink_assert(peer.isValid()); + + uint64_t hash = hash_code(peer); + auto con = this->find_connection(peer); + if (con != nullptr) { + return con; + } + + con = new UDP2ConnectionImpl(this, c, thread); + ink_release_assert(con->create_socket(local, 1048576, 1048576) >= 0); + if (con->connect(&peer.sa) < 0) { + char tmp[128] = {}; + Debug("udp_con", "Accept conn connect to peer failed: %s", ats_ip_ntop(&peer.sa, tmp, 128)); + delete con; + return nullptr; + } + + this->refcount_inc(); + auto it = this->_connection_map.find(hash); + if (it == this->_connection_map.end()) { + std::list l; + l.push_back(con); + this->_connection_map.insert(std::make_pair(hash, l)); + } else { + it->second.push_back(con); + } + + // in this time every packets read from accept will dispatch to new conn + // So we need to take old packet which already in list. + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + Queue st; + UDP2Packet *p; + while ((p = aq.pop())) { + st.push(p); + } + + while ((p = st.pop())) { + this->_recv_list.push_back(UDP2PacketUPtr(p)); + } + + for (auto it = this->_recv_list.begin(); it != this->_recv_list.end(); it++) { + if (ats_ip_addr_port_eq((*it)->to, con->to())) { + auto p = (*it).get(); + (*it).release(); + this->_recv_list.erase(it); + con->receive(p); + } + } + + ink_assert(con->start_io() >= 0); + + return con; +} + +void +AcceptUDP2ConnectionImpl::_process_close_connection(UDP2ConnectionImpl *con) +{ + ink_assert(con != nullptr); + uint64_t hash = hash_code(con->to()); + auto it = this->_connection_map.find(hash); + if (it != this->_connection_map.end()) { + for (auto itt = it->second.begin(); itt != it->second.end(); itt++) { + if (*itt == con) { + it->second.erase(itt); + if (it->second.empty()) { + this->_connection_map.erase(it); + } + this->refcount_dec(); + break; + } + } + } + + delete con; +} + +int +AcceptUDP2ConnectionImpl::close_connection(UDP2ConnectionImpl *con) +{ + ink_assert(con != nullptr); + ink_assert(con->refcount() == 0); + MUTEX_TRY_LOCK(lock, this->mutex, this_ethread()); + if (lock.is_locked()) { + this->_process_close_connection(con); + return 0; + } + + this->_reschedule(UDPEvents::UDP_CLOSE_EVENT, con); + return 0; +} + +void +AcceptUDP2ConnectionImpl::_process_recv_event() +{ + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + Queue st; + UDP2Packet *p; + while ((p = aq.pop())) { + st.push(p); + } + + while ((p = st.pop())) { + auto c = this->find_connection(p->from); + if (c == nullptr) { + this->_recv_list.push_back(UDP2PacketUPtr(p)); + } else { + c->receive(p); + } + } + + // Have something to read, signal continuation + if (!this->_recv_list.empty()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } +} diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h new file mode 100644 index 00000000000..eae419ddca2 --- /dev/null +++ b/iocore/net/UDPConnection.h @@ -0,0 +1,172 @@ +/** @file + * + * OpenSSL socket BIO that does TCP Fast Open. + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "tscore/ink_sock.h" +#include "I_EventSystem.h" +#include "P_Net.h" +#include "NetEvent.h" +#include "UDPPacket.h" + +#define NET_EVENT_DATAGRAM_CONNECT_SUCCESS (NET_EVENT_EVENTS_START + 170) +#define NET_EVENT_DATAGRAM_CONNECT_ERROR (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) +#define NET_EVENT_DATAGRAM_WRITE_READY (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) + +class AcceptUDP2ConnectionImpl; + +class UDP2Connection : public NetEvent +{ +public: + virtual ~UDP2Connection() {} + virtual int send(UDP2Packet *) = 0; + virtual UDP2Packet *recv() = 0; + + virtual int close() = 0; + virtual void set_continuation(Continuation *con) = 0; + virtual IpEndpoint from() = 0; + virtual IpEndpoint to() = 0; +}; + +class UDP2ConnectionImpl : public UDP2Connection, public Continuation, public RefCountObj +{ +public: + UDP2ConnectionImpl() = delete; + UDP2ConnectionImpl(AcceptUDP2ConnectionImpl *accpet, Continuation *con, EThread *ethread = nullptr); + // independent allocate. + UDP2ConnectionImpl(Continuation *con, EThread *ethread = nullptr); + ~UDP2ConnectionImpl(); + + enum class UDPEvents : uint8_t { + UDP_CONNECT_EVENT, + UDP_SEND_EVENT, + UDP_USER_READ_READY, + UDP_CLOSE_EVENT, + }; + + void + free() override + { + ink_release_assert(!"unimplement"); + } + + // NetEventHandler + void net_read_io(NetHandler *nh, EThread *lthread) override; + void net_write_io(NetHandler *nh, EThread *lthread) override; + void free(EThread *t) override; + int callback(int event = CONTINUATION_EVENT_NONE, void *data = nullptr) override; + void set_inactivity_timeout(ink_hrtime timeout_in) override; + EThread *get_thread() override; + int close() override; + int get_fd() override; + Ptr &get_mutex() override; + ContFlags &get_control_flags() override; + sockaddr const *get_remote_addr() override; + const NetVCOptions &get_options() override; + int start_io(); + + // UDP2Connection + int send(UDP2Packet *packet) override; + UDP2Packet *recv() override; + IpEndpoint from() override; + IpEndpoint to() override; + void set_continuation(Continuation *con) override; + + int create_socket(sockaddr const *addr, int recv_buf = 0, int send_buf = 0); + int connect(sockaddr const *addr); + bool is_connected(); + void bind_thread(EThread *thread); + int receive(UDP2Packet *packet); + + int startEvent(int event, void *data); + int mainEvent(int event, void *data); + int endEvent(int event, void *data); + +protected: + bool _is_closed() const; + void _reschedule(UDPEvents e, void *data); + void _reenable(VIO *vio); + virtual void _process_close_connection(UDP2ConnectionImpl *con); + + // Because Accept UDPConnection need to dispatch packet to different UDP2Connection. + // The recv buffer should be visiable to AcceptUDPConnection. + // FIXME: These should more abstract + virtual void _process_recv_event(); + ASLL(UDP2Packet, in_link) _recv_queue; + std::deque _recv_list; + + Continuation *_con = nullptr; + EThread *_thread = nullptr; + +private: + // control max data size per read, This can be calculated as MAX_NIOV * 1024 / read + static constexpr int MAX_NIOV = 1; + + ASLL(UDP2Packet, out_link) _send_queue; + + // internal schedule. + int _readv(struct iovec *iov, int len); + int _readv_from(IpEndpoint &from, struct iovec *iov, int len); + int _send_to(UDP2Packet *p); + int _send(UDP2Packet *p); + int _connect(sockaddr const *addr); + + IpEndpoint _from{}; + IpEndpoint _to{}; + + int _fd = -1; + AcceptUDP2ConnectionImpl *_accept_con = nullptr; + + // TODO removed + NetVCOptions _options{}; + ContFlags _cont_flags{}; + + std::deque _send_list; +}; + +// Accept UDP2Connection is ranning in ET_UDP, and dispatch UDPPacket to different sub connections in _connection_map +// So PacketHandler should manager all AcceptUDP2Connection to find the correct connection across diffierent listen +// Addrs. +// FIXME: In current implementable, every AcceptUDP2ConnectionImpl are independent. That means the client should always +// send to the same local addr. +class AcceptUDP2ConnectionImpl : public UDP2ConnectionImpl +{ +public: + AcceptUDP2ConnectionImpl(Continuation *c, EThread *thread) : UDP2ConnectionImpl(c, thread) {} + AcceptUDP2ConnectionImpl() = delete; + UDP2ConnectionImpl *create_connection(const IpEndpoint &from, const IpEndpoint &to, Continuation *c, EThread *thread); + UDP2ConnectionImpl *find_connection(const IpEndpoint &peer); + int close_connection(UDP2ConnectionImpl *con); + + int mainEvent(int event, void *data); + +private: + virtual void _process_close_connection(UDP2ConnectionImpl *con) override; + virtual void _process_recv_event() override; + + UDP2ConnectionImpl *_create_connection(const IpEndpoint &from, const IpEndpoint &to, Continuation *c, EThread *thread); + std::unordered_map> _connection_map; +}; + +using UDP2ConnectionSPtr = std::shared_ptr; diff --git a/iocore/net/UDPPacket.h b/iocore/net/UDPPacket.h new file mode 100644 index 00000000000..5c99eaca9fb --- /dev/null +++ b/iocore/net/UDPPacket.h @@ -0,0 +1,52 @@ +/** @file + + ALPNSupport.cc provides implmentations for ALPNSupport methods + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#pragma once + +#include "tscore/ink_sock.h" +#include "tscore/ink_inet.h" +#include "I_EventSystem.h" + +#include + +class UDP2Connection; + +struct UDP2Packet { + UDP2Packet() = default; + UDP2Packet(const IpEndpoint &from, const IpEndpoint &to, Ptr &chain) : from(from), to(to), chain(chain) {} + UDP2Packet(sockaddr const *from, sockaddr *to, Ptr &chain) : chain(chain) + { + ats_ip_copy(&this->from, from); + ats_ip_copy(&this->to, to); + } + + ~UDP2Packet() { this->chain = nullptr; } + IpEndpoint from{}; + IpEndpoint to{}; + Ptr chain; + + SLINK(UDP2Packet, in_link); + SLINK(UDP2Packet, out_link); + LINK(UDP2Packet, link); +}; + +using UDP2PacketUPtr = std::unique_ptr; diff --git a/iocore/net/UDPProcessor.cc b/iocore/net/UDPProcessor.cc new file mode 100644 index 00000000000..237e3a1b280 --- /dev/null +++ b/iocore/net/UDPProcessor.cc @@ -0,0 +1,95 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UDPProcessor.h" +#include "P_Net.h" + +UDP2NetProcessor udp2Net; +EventType ET_UDP2; + +// void +// initialize_thread_for_udp_net(EThread *thread) +// { +// NetHandler *nh = get_NetHandler(thread); +// +// new (reinterpret_cast(nh)) NetHandler(); +// new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); +// +// PollCont *pc = get_PollCont(thread); +// PollDescriptor *pd = pc->pollDescriptor; +// +// nh->configure_per_thread_values(); +// thread->set_tail_handler(nh); +// thread->ep = new EventIO(); +// thread->ep->type = EVENTIO_ASYNC_SIGNAL; +// #if HAVE_EVENTFD +// thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); +// #else +// thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); +// #endif +// } + +void +initialize_thread_for_udp2_net(EThread *thread) +{ + NetHandler *nh = get_NetHandler(thread); + + new (reinterpret_cast(nh)) NetHandler(); + new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); + nh->mutex = new_ProxyMutex(); + nh->thread = thread; + + PollCont *pc = get_PollCont(thread); + PollDescriptor *pd = pc->pollDescriptor; + + memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config)); + nh->configure_per_thread_values(); + + thread->set_tail_handler(nh); + thread->ep = static_cast(ats_malloc(sizeof(EventIO))); + new (thread->ep) EventIO(); + thread->ep->type = EVENTIO_ASYNC_SIGNAL; +#if HAVE_EVENTFD + thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); +#else + thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); +#endif +} + +int +UDP2NetProcessor::start(int n_upd_threads, size_t stacksize) +{ + if (n_upd_threads < 1) { + return -1; + } + + if (unix_netProcessor.pollCont_offset < 0) { + unix_netProcessor.pollCont_offset = eventProcessor.allocate(sizeof(PollCont)); + } + + if (unix_netProcessor.netHandler_offset < 0) + unix_netProcessor.netHandler_offset = eventProcessor.allocate(sizeof(NetHandler)); + + ET_UDP2 = eventProcessor.register_event_type("ET_UDP2"); + eventProcessor.schedule_spawn(&initialize_thread_for_udp2_net, ET_UDP2); + eventProcessor.spawn_event_threads(ET_UDP2, n_upd_threads, stacksize); + return 0; +} diff --git a/iocore/net/UDPProcessor.h b/iocore/net/UDPProcessor.h new file mode 100644 index 00000000000..f12bdae6b38 --- /dev/null +++ b/iocore/net/UDPProcessor.h @@ -0,0 +1,33 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_EventSystem.h" + +class UDP2NetProcessor : public Processor +{ +public: + int start(int n_upd_threads, size_t stacksize) override; +}; + +extern EventType ET_UDP2; +extern UDP2NetProcessor udp2Net; diff --git a/iocore/net/libinknet_stub.cc b/iocore/net/libinknet_stub.cc index 11fee253e2f..d55e7051612 100644 --- a/iocore/net/libinknet_stub.cc +++ b/iocore/net/libinknet_stub.cc @@ -47,7 +47,7 @@ DNSConnection::trigger() void StatPagesManager::register_http(char const *, Action *(*)(Continuation *, HTTPHdr *)) { - ink_assert(false); + // ink_assert(false); } #include "ParentSelection.h" diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc new file mode 100644 index 00000000000..309aa989cec --- /dev/null +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -0,0 +1,335 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include +#include +#include + +#include "tscore/I_Layout.h" +#include "tscore/TestBox.h" + +#include "I_EventSystem.h" +#include "I_Net.h" +#include "UDPConnection.h" +#include "UDPProcessor.h" +#include "records/I_RecProcess.h" +#include "RecordsConfig.h" + +#include "diags.i" + +void +signal_handler(int signum) +{ + std::exit(EXIT_SUCCESS); +} + +// StatPagesManager statPagesManager; +class CloseCont : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + signal_handler(0); + return 0; + } + + CloseCont() { SET_HANDLER(&CloseCont::mainEvent); } +}; + +static CloseCont close_cont; + +static constexpr char payload[] = "hello world"; +static constexpr char payload2[] = "hello world2"; +in_port_t port = 0; +int pfd[2]; // Pipe used to signal client with transient port. + +class EchoServer : public Continuation +{ +public: + int + closeEvent(int event, void *data) + { + ink_assert(event == NET_EVENT_DATAGRAM_WRITE_READY); + ink_assert(this->_sub_conn = static_cast(data)); + if (this->_sub_conn != nullptr) { + std::cout << "enter close event" << std::endl; + this->_sub_conn->close(); + this->_sub_conn = nullptr; + this_ethread()->schedule_in(&close_cont, 1 * HRTIME_SECOND); + } + return 0; + } + + int + subEvent(int event, void *data) + { + IpEndpoint empty{}; + switch (event) { + case NET_EVENT_DATAGRAM_CONNECT_SUCCESS: { + this->_sub_conn = static_cast(data); + auto packet = new UDP2Packet(); + packet->from = this->_packet->to; + packet->to = this->_packet->from; + packet->chain = this->_packet->chain; + ink_assert(this->_conn->send(packet) != EVENT_CONT); + delete this->_packet; + this->_packet = nullptr; + break; + } + case NET_EVENT_DATAGRAM_READ_READY: { + ink_assert(this->_sub_conn == static_cast(data)); + auto packet = this->_sub_conn->recv(); + if (packet == nullptr) { + break; + } + std::string str(packet->chain->start(), packet->chain->read_avail()); + std::cout << "receive msg1: " << str << std::endl; + delete packet; + + packet = this->_sub_conn->recv(); + if (packet == nullptr) { + break; + } + std::string str2(packet->chain->start(), packet->chain->read_avail()); + std::cout << "receive msg2: " << str2 << std::endl; + + ink_assert(this->_sub_conn->recv() == nullptr); + + packet->from = empty; + packet->to = empty; + ink_assert(this->_sub_conn->is_connected()); + SET_HANDLER(&EchoServer::closeEvent); + this->_sub_conn->send(packet); + break; + } + case NET_EVENT_DATAGRAM_WRITE_READY: { + // ink_assert(this->_sub_conn = static_cast(data)); + // this->_sub_conn->close(); + // this->_sub_conn = nullptr; + // this_ethread()->schedule_in(&close_cont, 1 * HRTIME_SECOND); + break; + } + default: + ink_assert(0); + break; + } + return 0; + } + + int + mainEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_READ_READY: { + SET_HANDLER(&EchoServer::subEvent); + auto t = eventProcessor.assign_thread(ET_NET); + this->_packet = this->_conn->recv(); + std::string str(this->_packet->chain->start(), this->_packet->chain->read_avail()); + std::cout << "receive msg: " << str << std::endl; + // sleep(5); + this->_sub_conn = this->_conn->create_connection(this->_conn->from(), this->_packet->from, this, t); + ink_assert(this->_sub_conn != nullptr); + ink_assert(this->_conn == static_cast(data)); + break; + } + case NET_EVENT_DATAGRAM_WRITE_READY: { + break; + } + } + return 0; + } + + EchoServer() + { + SET_HANDLER(&EchoServer::mainEvent); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + auto t = eventProcessor.assign_thread(ET_UDP2); + + this->_conn = new AcceptUDP2ConnectionImpl(this, t); + int res = this->_conn->create_socket(reinterpret_cast(&addr)); + if (res < 0) { + std::cout << "create socket error [" << strerror(errno) << "]" << std::endl; + std::exit(EXIT_FAILURE); + } + + std::cout << "bind to port: " << ats_ip_port_host_order(this->_conn->from()) << std::endl; + this->_conn->refcount_inc(); + this->_conn->start_io(); + int port = ats_ip_port_host_order(this->_conn->from()); + ink_release_assert(write(pfd[1], &port, sizeof(port)) == sizeof(port)); + } + ~EchoServer() { this->_data = nullptr; } + +private: + UDP2ConnectionImpl *_sub_conn; + AcceptUDP2ConnectionImpl *_conn; + Ptr _data; + UDP2Packet *_packet = nullptr; +}; + +void +udp_client(TestBox &box) +{ + char buf[128] = {0}; + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + std::cout << "Couldn't create socket" << std::endl; + std::exit(EXIT_FAILURE); + } + + struct timeval tv; + tv.tv_sec = 20; + tv.tv_usec = 0; + + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof(tv)); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(port); + + ssize_t n = sendto(sock, payload, sizeof(payload), 0, reinterpret_cast(&addr), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + + ssize_t l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + std::cout << "client recv: " << buf << std::endl; + box.check(strncmp(buf, payload, sizeof(payload)) == 0, "echo doesn't match"); + + n = sendto(sock, payload2, sizeof(payload2), 0, reinterpret_cast(&addr), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + + n = sendto(sock, payload2, sizeof(payload2), 0, reinterpret_cast(&addr), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + + memset(buf, 0, 128); + l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + std::cout << "client recv2: " << buf << std::endl; + box.check(strncmp(buf, payload2, sizeof(payload2)) == 0, "echo connect doesn't match"); + + close(sock); +} + +void +udp_echo_server() +{ + Layout::create(); + RecModeT mode_type = RECM_STAND_ALONE; + RecProcessInit(mode_type); + + Thread *main_thread = new EThread(); + main_thread->set_specific(); + net_config_poll_timeout = 10; + RecProcessInit(RECM_STAND_ALONE); + LibRecordsConfigInit(); + ink_net_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); + + // statPagesManager.init(); + init_diags("udp", nullptr); + ink_event_system_init(EVENT_SYSTEM_MODULE_PUBLIC_VERSION); + netProcessor.init(); + eventProcessor.start(1); + udp2Net.start(1, 1048576); + + initialize_thread_for_net(this_ethread()); + + signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, signal_handler); + + EchoServer *server = new EchoServer; + (void)server; + + this_thread()->execute(); +} + +REGRESSION_TEST(UDPNet_echo)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus) +{ + TestBox box(t, pstatus); + box = REGRESSION_TEST_PASSED; + + int z = pipe(pfd); + if (z < 0) { + std::cout << "Unable to create pipe" << std::endl; + std::exit(EXIT_FAILURE); + } + + pid_t pid = fork(); + if (pid < 0) { + std::cout << "Couldn't fork" << std::endl; + std::exit(EXIT_FAILURE); + } else if (pid == 0) { + close(pfd[0]); + udp_echo_server(); + } else { + close(pfd[1]); + if (read(pfd[0], &port, sizeof(port)) <= 0) { + std::cout << "Failed to get signal with port data [" << errno << ']' << std::endl; + std::exit(EXIT_FAILURE); + } + Debug("udp_echo", "client get ports: %d", port); + udp_client(box); + + // kill(pid, SIGTERM); + int status; + wait(&status); + + if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { + std::cout << "UDP Echo Server exit failure" << std::endl; + std::exit(EXIT_FAILURE); + } + } +} + +int +main(int /* argc ATS_UNUSED */, const char ** /* argv ATS_UNUSED */) +{ + RegressionTest::run("UDPNet", REGRESSION_TEST_QUICK); + return RegressionTest::final_status == REGRESSION_TEST_PASSED ? 0 : 1; +} diff --git a/iocore/net/test_UDPEcho.cc b/iocore/net/test_UDPEcho.cc new file mode 100644 index 00000000000..688b62b34b0 --- /dev/null +++ b/iocore/net/test_UDPEcho.cc @@ -0,0 +1,262 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include +#include +#include + +#include "tscore/I_Layout.h" +#include "tscore/TestBox.h" + +#include "I_EventSystem.h" +#include "I_Net.h" +#include "UDPConnection.h" +#include "UDPProcessor.h" +#include "records/I_RecProcess.h" +#include "RecordsConfig.h" + +#include "diags.i" + +void +signal_handler(int signum) +{ + std::exit(EXIT_SUCCESS); +} + +// StatPagesManager statPagesManager; +class CloseCont : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + signal_handler(0); + return 0; + } + + CloseCont() { SET_HANDLER(&CloseCont::mainEvent); } +}; + +static CloseCont close_cont; + +static constexpr char payload[] = "hello world"; +in_port_t port = 0; +int pfd[2]; // Pipe used to signal client with transient port. + +class EchoServer : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_READ_READY: { + UDP2Connection *con = static_cast(data); + ink_assert(this->_conn.get() == con); + auto packet = this->_conn->recv(); + this->_data = packet->chain; + std::string str(packet->chain->start(), packet->chain->read_avail()); + std::cout << "receive msg: " << str << std::endl; + this->_peer = packet->from; + packet->from = packet->to; + packet->to = this->_peer; + ink_assert(this->_conn->send(packet) != EVENT_CONT); + break; + } + case NET_EVENT_DATAGRAM_WRITE_READY: { + if (this->_data != nullptr) { + UDP2Packet *new_p = new UDP2Packet; + new_p->chain = this->_data; + this->_data = nullptr; + ink_assert(this->_conn->connect(&this->_peer.sa) == 0); + ink_assert(this->_conn->is_connected()); + ink_assert(this->_conn->send(new_p) != EVENT_CONT); + this_ethread()->schedule_in(&close_cont, 1 * HRTIME_SECOND); + } else { + this->_conn->close(); + this->_conn.release(); + this->_conn = nullptr; + } + break; + } + } + return 0; + } + + EchoServer() + { + SET_HANDLER(&EchoServer::mainEvent); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + this->_conn = std::make_unique(this); + int res = this->_conn->create_socket(reinterpret_cast(&addr)); + if (res < 0) { + std::cout << "create socket error [" << strerror(errno) << "]" << std::endl; + std::exit(EXIT_FAILURE); + } + + std::cout << "bind to port: " << ats_ip_port_host_order(this->_conn->from()) << std::endl; + this->_conn->refcount_inc(); + this->_conn->start_io(); + int port = ats_ip_port_host_order(this->_conn->from()); + ink_release_assert(write(pfd[1], &port, sizeof(port)) == sizeof(port)); + } + ~EchoServer() { this->_data = nullptr; } + +private: + std::unique_ptr _conn; + Ptr _data; + IpEndpoint _peer{}; +}; + +void +udp_client(TestBox &box) +{ + char buf[128] = {0}; + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + std::cout << "Couldn't create socket" << std::endl; + std::exit(EXIT_FAILURE); + } + + struct timeval tv; + tv.tv_sec = 20; + tv.tv_usec = 0; + + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof(tv)); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(port); + + ssize_t n = sendto(sock, payload, sizeof(payload), 0, reinterpret_cast(&addr), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + + ssize_t l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + std::cout << "client recv: " << buf << std::endl; + box.check(strncmp(buf, payload, sizeof(payload)) == 0, "echo doesn't match"); + + memset(buf, 0, 128); + l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + std::cout << "client recv2: " << buf << std::endl; + box.check(strncmp(buf, payload, sizeof(payload)) == 0, "echo connect doesn't match"); + + close(sock); +} + +void +udp_echo_server() +{ + Layout::create(); + RecModeT mode_type = RECM_STAND_ALONE; + RecProcessInit(mode_type); + + Thread *main_thread = new EThread(); + main_thread->set_specific(); + net_config_poll_timeout = 10; + RecProcessInit(RECM_STAND_ALONE); + LibRecordsConfigInit(); + ink_net_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); + + // statPagesManager.init(); + init_diags("udp", nullptr); + ink_event_system_init(EVENT_SYSTEM_MODULE_PUBLIC_VERSION); + netProcessor.init(); + eventProcessor.start(1); + udp2Net.start(1, 1048576); + + initialize_thread_for_net(this_ethread()); + + signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, signal_handler); + + EchoServer *server = new EchoServer; + (void)server; + // eventProcessor.schedule_in(server, 1); + + this_thread()->execute(); +} + +REGRESSION_TEST(UDPNet_echo)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus) +{ + TestBox box(t, pstatus); + box = REGRESSION_TEST_PASSED; + + int z = pipe(pfd); + if (z < 0) { + std::cout << "Unable to create pipe" << std::endl; + std::exit(EXIT_FAILURE); + } + + pid_t pid = fork(); + if (pid < 0) { + std::cout << "Couldn't fork" << std::endl; + std::exit(EXIT_FAILURE); + } else if (pid == 0) { + close(pfd[0]); + udp_echo_server(); + } else { + close(pfd[1]); + if (read(pfd[0], &port, sizeof(port)) <= 0) { + std::cout << "Failed to get signal with port data [" << errno << ']' << std::endl; + std::exit(EXIT_FAILURE); + } + Debug("udp_echo", "client get ports: %d", port); + udp_client(box); + + // kill(pid, SIGTERM); + int status; + wait(&status); + + if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { + std::cout << "UDP Echo Server exit failure" << std::endl; + std::exit(EXIT_FAILURE); + } + } +} + +int +main(int /* argc ATS_UNUSED */, const char ** /* argv ATS_UNUSED */) +{ + RegressionTest::run("UDPNet", REGRESSION_TEST_QUICK); + return RegressionTest::final_status == REGRESSION_TEST_PASSED ? 0 : 1; +} From 95ef7d9e39a501f823605b654d38ae9edcc094b1 Mon Sep 17 00:00:00 2001 From: scw00 Date: Tue, 17 Dec 2019 13:45:09 +0800 Subject: [PATCH 04/12] Add more debug log --- iocore/net/UDPConnection.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 1f2f8930725..c1bdbf47560 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -240,7 +240,7 @@ int UDP2ConnectionImpl::mainEvent(int event, void *data) { ink_assert(this->mutex->thread_holding == this->_thread); - Debug("udp_conn", "mainEvent refcount: %d", this->refcount()); + Debug("udp_conn", "mainEvent refcount: %d %s", this->refcount(), udp_event_name(static_cast(event))); ink_assert(this->refcount_dec() > 0); switch (static_cast(event)) { case UDPEvents::UDP_CONNECT_EVENT: From fcb9eda781eb8e2fdb92049aef72c1280c3472bf Mon Sep 17 00:00:00 2001 From: scw00 Date: Tue, 17 Dec 2019 14:18:40 +0800 Subject: [PATCH 05/12] Fix ci assertion --- iocore/net/test_UDPAcceptEcho.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc index 309aa989cec..7c17501d99e 100644 --- a/iocore/net/test_UDPAcceptEcho.cc +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -70,6 +70,10 @@ class EchoServer : public Continuation int closeEvent(int event, void *data) { + if (event == NET_EVENT_DATAGRAM_READ_READY) { + return 0; + } + ink_assert(event == NET_EVENT_DATAGRAM_WRITE_READY); ink_assert(this->_sub_conn = static_cast(data)); if (this->_sub_conn != nullptr) { From cf1a64892ba6847e8784e4db308df8757e65c175 Mon Sep 17 00:00:00 2001 From: scw00 Date: Tue, 17 Dec 2019 16:21:55 +0800 Subject: [PATCH 06/12] Revert bool closed change --- iocore/net/NetEvent.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iocore/net/NetEvent.h b/iocore/net/NetEvent.h index 839dfe41787..85000fa82fd 100644 --- a/iocore/net/NetEvent.h +++ b/iocore/net/NetEvent.h @@ -64,7 +64,7 @@ class NetEvent NetState read{}; NetState write{}; - bool closed = false; + int closed = 0; NetHandler *nh = nullptr; unsigned int id = 0; From 97f92afc449a0ac5c7e1ceb906a423ed92ad74ee Mon Sep 17 00:00:00 2001 From: scw00 Date: Thu, 19 Dec 2019 14:47:01 +0800 Subject: [PATCH 07/12] Fix write_reenable --- iocore/net/UDPConnection.cc | 2 +- iocore/net/UDPProcessor.cc | 22 ---------------------- iocore/net/test_UDPEcho.cc | 7 +++---- 3 files changed, 4 insertions(+), 27 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index c1bdbf47560..1ca2193e400 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -705,7 +705,7 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) continue; } - if (errno != EAGAIN) { + if (errno == EAGAIN) { this->write.triggered = 0; write_reschedule(nh, this); break; diff --git a/iocore/net/UDPProcessor.cc b/iocore/net/UDPProcessor.cc index 237e3a1b280..07a851aac80 100644 --- a/iocore/net/UDPProcessor.cc +++ b/iocore/net/UDPProcessor.cc @@ -25,28 +25,6 @@ UDP2NetProcessor udp2Net; EventType ET_UDP2; -// void -// initialize_thread_for_udp_net(EThread *thread) -// { -// NetHandler *nh = get_NetHandler(thread); -// -// new (reinterpret_cast(nh)) NetHandler(); -// new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); -// -// PollCont *pc = get_PollCont(thread); -// PollDescriptor *pd = pc->pollDescriptor; -// -// nh->configure_per_thread_values(); -// thread->set_tail_handler(nh); -// thread->ep = new EventIO(); -// thread->ep->type = EVENTIO_ASYNC_SIGNAL; -// #if HAVE_EVENTFD -// thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); -// #else -// thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); -// #endif -// } - void initialize_thread_for_udp2_net(EThread *thread) { diff --git a/iocore/net/test_UDPEcho.cc b/iocore/net/test_UDPEcho.cc index 688b62b34b0..637e0247dcf 100644 --- a/iocore/net/test_UDPEcho.cc +++ b/iocore/net/test_UDPEcho.cc @@ -72,7 +72,7 @@ class EchoServer : public Continuation switch (event) { case NET_EVENT_DATAGRAM_READ_READY: { UDP2Connection *con = static_cast(data); - ink_assert(this->_conn.get() == con); + ink_assert(this->_conn == con); auto packet = this->_conn->recv(); this->_data = packet->chain; std::string str(packet->chain->start(), packet->chain->read_avail()); @@ -94,7 +94,6 @@ class EchoServer : public Continuation this_ethread()->schedule_in(&close_cont, 1 * HRTIME_SECOND); } else { this->_conn->close(); - this->_conn.release(); this->_conn = nullptr; } break; @@ -111,7 +110,7 @@ class EchoServer : public Continuation addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_port = 0; - this->_conn = std::make_unique(this); + this->_conn = new UDP2ConnectionImpl(this); int res = this->_conn->create_socket(reinterpret_cast(&addr)); if (res < 0) { std::cout << "create socket error [" << strerror(errno) << "]" << std::endl; @@ -127,7 +126,7 @@ class EchoServer : public Continuation ~EchoServer() { this->_data = nullptr; } private: - std::unique_ptr _conn; + UDP2ConnectionImpl *_conn; Ptr _data; IpEndpoint _peer{}; }; From ff1d063b484bcf5b10ce1c0518c3dec2c4c347b0 Mon Sep 17 00:00:00 2001 From: scw00 Date: Thu, 19 Dec 2019 16:58:00 +0800 Subject: [PATCH 08/12] Use REUSE_ADDR for udp connection --- iocore/net/UDPConnection.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 1ca2193e400..6b3e676efb5 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -389,13 +389,8 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b } // If this is a class D address (i.e. multicast address), use REUSEADDR. - if (ats_is_ip_multicast(addr)) { - int enable_reuseaddr = 1; - - if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&enable_reuseaddr), - sizeof(enable_reuseaddr)) < 0)) { - goto Lerror; - } + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { + goto Lerror; } if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { From 1ecd8115ce3b1f750714f2f72b0be3ddd05a0aa1 Mon Sep 17 00:00:00 2001 From: scw00 Date: Fri, 20 Dec 2019 10:18:23 +0800 Subject: [PATCH 09/12] Fix release assertion --- iocore/net/UDPConnection.cc | 44 +++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 6b3e676efb5..dbaf0f8afc8 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -102,31 +102,15 @@ UDP2ConnectionImpl::UDP2ConnectionImpl(AcceptUDP2ConnectionImpl *accept, Continu UDP2ConnectionImpl::~UDP2ConnectionImpl() { - ink_assert(this->mutex->thread_holding == this_thread()); Debug("udp_con", "connection close"); this->mutex = nullptr; int fd = this->_fd; - if (this->nh != nullptr) { - this->nh->stopIO(this); - } - this->_fd = -1; if (fd != -1) { ::close(fd); } - - SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); - UDP2Packet *p; - while ((p = aq.pop())) { - delete p; - } - - SList(UDP2Packet, in_link) aq2(this->_recv_queue.popall()); - while ((p = aq2.pop())) { - delete p; - } } void @@ -307,8 +291,20 @@ UDP2ConnectionImpl::endEvent(int event, void *data) return 0; } + SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); + UDP2Packet *p; + while ((p = aq.pop())) { + delete p; + } + + SList(UDP2Packet, in_link) aq2(this->_recv_queue.popall()); + while ((p = aq2.pop())) { + delete p; + } + // kick out netevent from NetHandler this->nh->stopIO(this); + this->nh = nullptr; if (this->_accept_con != nullptr) { this->_accept_con->close_connection(this); } else { @@ -389,9 +385,9 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b } // If this is a class D address (i.e. multicast address), use REUSEADDR. - if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { - goto Lerror; - } + // if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { + // goto Lerror; + // } if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { goto Lerror; @@ -952,6 +948,7 @@ AcceptUDP2ConnectionImpl::create_connection(const IpEndpoint &local, const IpEnd ink_assert(this->mutex->thread_holding == this->_thread); ink_assert(peer.isValid()); + char tmp[128] = {}; uint64_t hash = hash_code(peer); auto con = this->find_connection(peer); if (con != nullptr) { @@ -959,9 +956,14 @@ AcceptUDP2ConnectionImpl::create_connection(const IpEndpoint &local, const IpEnd } con = new UDP2ConnectionImpl(this, c, thread); - ink_release_assert(con->create_socket(local, 1048576, 1048576) >= 0); + if (con->create_socket(local, 1048576, 1048576) < 0) { + Error("Can not create socket %s", ats_ip_ntop(&peer.sa, tmp, 128)); + // ink_assert(0); + delete con; + return nullptr; + } + if (con->connect(&peer.sa) < 0) { - char tmp[128] = {}; Debug("udp_con", "Accept conn connect to peer failed: %s", ats_ip_ntop(&peer.sa, tmp, 128)); delete con; return nullptr; From fbd20cc3c5a5cfc4857a5ad84e5c8564825f920f Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 23 Dec 2019 10:25:56 +0800 Subject: [PATCH 10/12] run startIO in assigned thread --- iocore/net/UDPConnection.cc | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index dbaf0f8afc8..80132849e12 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -258,21 +258,19 @@ UDP2ConnectionImpl::mainEvent(int event, void *data) int UDP2ConnectionImpl::startEvent(int event, void *data) { - // ink_assert(this->mutex->thread_holding == this->_thread); ink_assert(this->refcount_dec() > 0); NetHandler *nh = get_NetHandler(this->_thread); - MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); - if (!lock.is_locked()) { - this->refcount_inc(); - SET_HANDLER(&UDP2ConnectionImpl::startEvent); - this->_thread->schedule_in(this, net_retry_delay); - return 1; + if (this->_thread == this_ethread()) { + Debug("udp_conn", "startEvent complete refcount: %d", this->refcount()); + SET_HANDLER(&UDP2ConnectionImpl::mainEvent); + ink_assert(nh->startIO(this) >= 0); + return 0; } - Debug("udp_conn", "startEvent complete refcount: %d", this->refcount()); - SET_HANDLER(&UDP2ConnectionImpl::mainEvent); - ink_assert(nh->startIO(this) >= 0); - return 0; + this->refcount_inc(); + SET_HANDLER(&UDP2ConnectionImpl::startEvent); + this->_thread->schedule_in(this, net_retry_delay); + return 1; } int From 0a8a639b85c163c712bdad2225bbf75a1936c4a0 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 23 Dec 2019 10:33:19 +0800 Subject: [PATCH 11/12] fix bug --- iocore/net/UDPConnection.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 80132849e12..b1297b9f9bf 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -261,10 +261,13 @@ UDP2ConnectionImpl::startEvent(int event, void *data) ink_assert(this->refcount_dec() > 0); NetHandler *nh = get_NetHandler(this->_thread); if (this->_thread == this_ethread()) { - Debug("udp_conn", "startEvent complete refcount: %d", this->refcount()); - SET_HANDLER(&UDP2ConnectionImpl::mainEvent); - ink_assert(nh->startIO(this) >= 0); - return 0; + MUTEX_TRY_LOCK(lock, nh->mutex, this->_thread); + if (lock.is_locked()) { + Debug("udp_conn", "startEvent complete refcount: %d", this->refcount()); + SET_HANDLER(&UDP2ConnectionImpl::mainEvent); + ink_assert(nh->startIO(this) >= 0); + return 0; + } } this->refcount_inc(); From 0cb893b014e2aa569fcb8ca618b07b78c8c04c17 Mon Sep 17 00:00:00 2001 From: scw00 Date: Thu, 26 Dec 2019 15:45:33 +0800 Subject: [PATCH 12/12] add signal activity --- iocore/net/UDPConnection.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index b1297b9f9bf..b2cd859fda9 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -887,7 +887,10 @@ UDP2ConnectionImpl::receive(UDP2Packet *packet) return 0; } + // FIXME: we might send too many UDP_USER_READ_READY but only first one + // works. this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr); + this->nh->signalActivity(); return 0; } @@ -902,6 +905,7 @@ UDP2ConnectionImpl::send(UDP2Packet *p) this->_reenable(&this->write.vio); } else { // cross thread + // FIXME the same as receive. this->_reschedule(UDPEvents::UDP_SEND_EVENT, nullptr); } this->nh->signalActivity();
%s%d
%s%d
%s%d