From 866f14dc6f18ac2ca3ba5454eb5b88e2385eee7d Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 16 Dec 2019 10:09:35 +0800 Subject: [PATCH 01/11] Add udp2 implementation --- iocore/net/AtomicEvent.h | 91 +++ iocore/net/Makefile.am | 40 +- iocore/net/UDPConnection.cc | 1089 ++++++++++++++++++++++++++++ iocore/net/UDPConnection.h | 167 +++++ iocore/net/UDPConnectionManager.cc | 126 ++++ iocore/net/UDPConnectionManager.h | 60 ++ iocore/net/UDPPacket.h | 52 ++ iocore/net/UDPProcessor.cc | 85 +++ iocore/net/UDPProcessor.h | 43 ++ iocore/net/libinknet_stub.cc | 2 +- iocore/net/test_UDPAcceptEcho.cc | 346 +++++++++ 11 files changed, 2098 insertions(+), 3 deletions(-) create mode 100644 iocore/net/AtomicEvent.h create mode 100644 iocore/net/UDPConnection.cc create mode 100644 iocore/net/UDPConnection.h create mode 100644 iocore/net/UDPConnectionManager.cc create mode 100644 iocore/net/UDPConnectionManager.h create mode 100644 iocore/net/UDPPacket.h create mode 100644 iocore/net/UDPProcessor.cc create mode 100644 iocore/net/UDPProcessor.h create mode 100644 iocore/net/test_UDPAcceptEcho.cc diff --git a/iocore/net/AtomicEvent.h b/iocore/net/AtomicEvent.h new file mode 100644 index 00000000000..de086c5ab75 --- /dev/null +++ b/iocore/net/AtomicEvent.h @@ -0,0 +1,91 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include + +#include "I_EventSystem.h" + +class AtomicEvent +{ +public: + bool + schedule(Continuation *c, EThread *t, int event, void *data, ink_hrtime delay = 0, int periodic = 0) + { + auto new_e = ::eventAllocator.alloc(); + new_e->init(c, delay, periodic); + new_e->callback_event = event; + new_e->cookie = data; + + Event *tmp = nullptr; + if (this->_e.compare_exchange_weak(tmp, new_e, std::memory_order_acq_rel)) { + t->schedule(new_e); + return true; + } else { + // we should not reschedule events when event is -1 or nullptr. Because the connection + // might be closed or already have events in plane. + new_e->free(); + return false; + } + } + + // thread unsafe. only target thread can cancel this event. + void + cancel() + { + Event *tmp = nullptr; + do { + if (tmp != nullptr) { + tmp->cancel(); + } + tmp = this->_e.load(std::memory_order_acquire); + if (tmp == reinterpret_cast(-1)) { + return; + } + } while (!this->_e.compare_exchange_weak(tmp, static_cast(nullptr), std::memory_order_acq_rel)); + + if (tmp != nullptr) { + tmp->cancel(); + } + } + + void + close() + { + Event *tmp = nullptr; + do { + if (tmp != nullptr) { + tmp->cancel(); + } + + tmp = this->_e.load(std::memory_order_acquire); + ink_release_assert(tmp != reinterpret_cast(-1)); + } while (!this->_e.compare_exchange_weak(tmp, reinterpret_cast(-1), std::memory_order_acq_rel)); + + if (tmp != nullptr) { + tmp->cancel(); + } + } + +private: + std::atomic _e{}; +}; diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index 2ec6f4e668c..aa4e82dd7cb 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -37,7 +37,7 @@ AM_CPPFLAGS += \ TESTS = $(check_PROGRAMS) -check_PROGRAMS = test_certlookup test_UDPNet +check_PROGRAMS = test_certlookup test_UDPNet test_UDPAcceptEcho noinst_LIBRARIES = libinknet.a test_certlookup_LDFLAGS = \ @@ -163,7 +163,42 @@ libinknet_a_SOURCES = \ UnixNetVConnection.cc \ UnixUDPConnection.cc \ UnixUDPNet.cc \ - SSLDynlock.cc + SSLDynlock.cc \ + UDPProcessor.cc \ + UDPConnection.cc \ + UDPConnectionManager.cc + +test_UDPAcceptEcho_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(iocore_include_dirs) \ + -I$(abs_top_srcdir)/proxy \ + -I$(abs_top_srcdir)/proxy/hdrs \ + -I$(abs_top_srcdir)/proxy/http \ + -I$(abs_top_srcdir)/proxy/logging \ + -I$(abs_top_srcdir)/mgmt \ + -I$(abs_top_srcdir)/mgmt/utils \ + @OPENSSL_INCLUDES@ + +test_UDPAcceptEcho_LDFLAGS = \ + @AM_LDFLAGS@ \ + @OPENSSL_LDFLAGS@ \ + @YAMLCPP_LDFLAGS@ + +test_UDPAcceptEcho_LDADD = \ + libinknet.a \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/src/tscore/libtscore.la $(top_builddir)/src/tscpp/util/libtscpputil.la \ + $(top_builddir)/proxy/ParentSelectionStrategy.o \ + @HWLOC_LIBS@ @OPENSSL_LIBS@ @LIBPCRE@ @YAMLCPP_LIBS@ + +test_UDPAcceptEcho_SOURCES = \ + libinknet_stub.cc \ + UnixUDPConnection.cc \ + UnixUDPNet.cc \ + test_UDPAcceptEcho.cc + if ENABLE_QUIC libinknet_a_SOURCES += \ @@ -192,3 +227,4 @@ include $(top_srcdir)/build/tidy.mk clang-tidy-local: $(DIST_SOURCES) $(CXX_Clang_Tidy) + diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc new file mode 100644 index 00000000000..9723db20ef7 --- /dev/null +++ b/iocore/net/UDPConnection.cc @@ -0,0 +1,1089 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include + +#include "UDPConnection.h" +#include "UDPConnectionManager.h" + +#include "tscore/ink_atomic.h" + +static const char * +udp_event_name(UDP2ConnectionImpl::UDPEvents e) +{ + switch (e) { + case UDP2ConnectionImpl::UDPEvents::UDP_START_EVENT: + return "UDP_START_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_CONNECT_EVENT: + return "UDP_CONNECT_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_USER_READ_READY: + return "UDP_USER_READ_READY"; + case UDP2ConnectionImpl::UDPEvents::UDP_SEND_READY: + return "UDP_SEND_READY"; + default: + return "UNKNOWN EVENT"; + }; + + return nullptr; +} + +static const char * +udp_event_name(int e) +{ + return udp_event_name(static_cast(e)); +} + +// +// Reschedule a NetEvent by moving it +// onto or off of the ready_list +// +static inline void +read_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_READ); + if (vc->read.triggered && vc->read.enabled) { + nh->read_ready_list.in_or_enqueue(vc); + } else { + nh->read_ready_list.remove(vc); + } +} + +static inline void +write_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_WRITE); + if (vc->write.triggered && vc->write.enabled) { + nh->write_ready_list.in_or_enqueue(vc); + } else { + nh->write_ready_list.remove(vc); + } +} + +// +// UDP2ConnectionImpl +// +UDP2ConnectionImpl::UDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *con, EThread *thread) + : _con(con), _thread(thread), _manager(manager) +{ + this->mutex = con->mutex; + this->read.enabled = 1; // read enabled is always true because we expected all data; + if (thread == nullptr) { + this->_thread = this_ethread(); + } + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } + SET_HANDLER(&UDP2ConnectionImpl::startEvent); +} + +UDP2ConnectionImpl::~UDP2ConnectionImpl() +{ + Debug("udp_con", "destroy"); + + int fd = this->_fd; + + this->_fd = -1; + if (fd != -1) { + ::close(fd); + } +} + +void +UDP2ConnectionImpl::free(EThread *t) +{ + Debug("udp_con", "free connection"); + this->mutex = nullptr; + + this->_user_read_ready_event.close(); + this->_send_ready_event.close(); + this->_close_event(UDPEvents::UDP_START_EVENT); + this->_close_event(UDPEvents::UDP_CONNECT_EVENT); + + this->read.enabled = 0; + this->read.triggered = 0; + + this->write.enabled = 0; + this->write.triggered = 0; + this->nh->stopIO(this); + + int fd = this->_fd; + + this->_fd = -1; + if (fd != -1) { + ::close(fd); + } + + this->_manager.close_connection(this, ""); +} + +int +UDP2ConnectionImpl::callback(int event, void *data) +{ + if (this->_con == nullptr) { + return 0; + } + + MUTEX_TRY_LOCK(lock, this->_con->mutex == nullptr ? this->mutex : this->_con->mutex, this_ethread()); + if (!lock.is_locked()) { + // TODO reuse cached event + Debug("udpcon", "callback get con lock failed"); + this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr); + return 0; + } + return this->_con->handleEvent(event, data); +} + +void +UDP2ConnectionImpl::set_inactivity_timeout(ink_hrtime timeout_in) +{ +} + +EThread * +UDP2ConnectionImpl::get_thread() +{ + return this->_thread; +} + +int +UDP2ConnectionImpl::close() +{ + // detach contiuation. we should not callback to con after `close` has been called + this->_con = nullptr; + this->mutex = this->_thread->mutex; + + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + UDP2Packet *p; + while ((p = aq.pop())) { + delete p; + } + + if (this->_send_list.empty() && this->_send_queue.empty()) { + this->free(nullptr); + return 0; + } + + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + return 0; +} + +int +UDP2ConnectionImpl::get_fd() +{ + return this->_fd; +} + +Ptr & +UDP2ConnectionImpl::get_mutex() +{ + return this->mutex; +} + +ContFlags & +UDP2ConnectionImpl::get_control_flags() +{ + return _cont_flags; +} + +bool +UDP2ConnectionImpl::_is_closed() const +{ + return this->_con == nullptr; +} + +int +UDP2ConnectionImpl::startEvent(int event, void *data) +{ + Debug("udp_con", "startEvent %s-%d", udp_event_name(event), event); + this->_close_event(event); + switch (static_cast(event)) { + case UDPEvents::UDP_CONNECT_EVENT: + this->connect(&this->_to.sa); + break; + case UDPEvents::UDP_START_EVENT: { + NetHandler *nh = get_NetHandler(this->_thread); + if (this->_thread == this_ethread()) { + MUTEX_TRY_LOCK(lock, nh->mutex, this->_thread); + if (lock.is_locked()) { + SET_HANDLER(&UDP2ConnectionImpl::mainEvent); + ink_assert(nh->startIO(this) >= 0); + // reenable read since there might be some packets in socket's buffer. + if (!this->_recv_list.empty() || !this->_recv_queue.empty()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } + break; + } + } + this->_reschedule(UDPEvents::UDP_START_EVENT, nullptr, net_retry_delay); + break; + } + default: + ink_release_assert(0); + break; + } + + if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + this->free(nullptr); + } else if (this->_is_closed()) { + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + } + return 0; +} + +int +UDP2ConnectionImpl::mainEvent(int event, void *data) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + this->_close_event(event); + switch (static_cast(event)) { + case UDPEvents::UDP_CONNECT_EVENT: + this->connect(&this->_to.sa); + break; + case UDPEvents::UDP_USER_READ_READY: + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + break; + default: + Debug("udp_con", "unknown events: %d", event); + ink_release_assert(0); + break; + } + + if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + this->free(nullptr); + } else if (this->_is_closed()) { + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + } + + return 0; +} + +int +UDP2ConnectionImpl::start_io() +{ + return this->startEvent(0, nullptr); +} + +int +UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_buf) +{ + int res = 0; + int fd = -1; + IpEndpoint local_addr{}; + int local_addr_len = sizeof(local_addr); + if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) { + goto Lerror; + } + + fd = res; + if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0) { + goto Lerror; + } + + if (recv_buf > 0) { + if (unlikely(socketManager.set_rcvbuf_size(fd, recv_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", recv_buf); + } + } + if (send_buf > 0) { + if (unlikely(socketManager.set_sndbuf_size(fd, send_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", send_buf); + } + } + + if (addr->sa_family == AF_INET) { + bool succeeded = false; + int enable = 1; +#ifdef IP_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IP_RECVDSTADDR + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } else if (addr->sa_family == AF_INET6) { + bool succeeded = false; + int enable = 1; +#ifdef IPV6_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IPV6_RECVPKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } + + // If this is a class D address (i.e. multicast address), use REUSEADDR. + // if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { + // goto Lerror; + // } + + if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + goto Lerror; + } + + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int))) < 0) { + goto Lerror; + } + + if (-1 == socketManager.ink_bind(fd, addr, ats_ip_size(addr))) { + char buff[INET6_ADDRPORTSTRLEN]; + Debug("udp_con", "ink bind failed on %s %s", ats_ip_nptop(addr, buff, sizeof(buff)), strerror(errno)); + goto Lerror; + } + + if ((res = safe_getsockname(fd, &local_addr.sa, &local_addr_len)) < 0) { + Debug("udp_con", "CreateUdpsocket: getsockname didn't work"); + goto Lerror; + } + + ats_ip_copy(&this->_from, &local_addr.sa); + this->_fd = fd; + Debug("udp_con", "creating a udp socket port = %d---success", ats_ip_port_host_order(local_addr)); + return 0; +Lerror: + Debug("udp_con", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr)); + if (fd != -1) { + socketManager.close(fd); + } + + return -errno; +} + +IpEndpoint +UDP2ConnectionImpl::from() +{ + return this->_from; +} + +IpEndpoint +UDP2ConnectionImpl::to() +{ + return this->_to; +} + +int +UDP2ConnectionImpl::_connect() +{ + ink_assert(this->_fd != NO_FD); + ink_assert(this->_to.port() != 0); + int res = ::connect(this->_fd, &this->_to.sa, ats_ip_size(&this->_to.sa)); + if (res >= 0) { + this->_connected = true; + return 0; + } + + return -errno; +} + +int +UDP2ConnectionImpl::connect(sockaddr const *addr) +{ + if (this->_to.port() == 0) { + ats_ip_copy(&this->_to, addr); + } + int res = this->_connect(); + if (res < 0) { + if ((res == -EINPROGRESS) || (res == -EWOULDBLOCK)) { + this->_reschedule(UDPEvents::UDP_CONNECT_EVENT, nullptr); + return 0; + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_ERROR, this); + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_SUCCESS, this); +} + +bool +UDP2ConnectionImpl::is_connected() const +{ + return this->_connected; +} + +void +UDP2ConnectionImpl::set_continuation(Continuation *con) +{ + // ink_assert(this->mutex == nullptr); + // rebind mutex; + this->_con = con; + this->mutex = con->mutex; + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } +} + +void +UDP2ConnectionImpl::bind_thread(EThread *thread) +{ + this->_thread = thread; +} + +void +UDP2ConnectionImpl::_reschedule(UDPEvents e, void *data, int64_t delay) +{ + Debug("udp_con", "schedule event %s", udp_event_name(e)); + Event **event = nullptr; + switch (e) { + case UDPEvents::UDP_START_EVENT: + event = &this->_start_event; + break; + case UDPEvents::UDP_CONNECT_EVENT: + event = &this->_connect_event; + break; + case UDPEvents::UDP_USER_READ_READY: + this->_user_read_ready_event.schedule(this, this->_thread, static_cast(e), data, delay); + return; + case UDPEvents::UDP_SEND_READY: + this->_send_ready_event.schedule(this, this->_thread, static_cast(e), data, delay); + return; + default: + ink_release_assert(!"unknown events"); + break; + } + + if (*event != nullptr) { + (*event)->cancel(); + (*event) = nullptr; + } + + if (delay) { + *event = this->_thread->schedule_in(this, delay, static_cast(e), data); + } else { + *event = this->_thread->schedule_imm(this, static_cast(e), data); + } +} + +void +UDP2ConnectionImpl::reschedule_read() +{ + this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr, 0); +} + +void +UDP2ConnectionImpl::reschedule_write() +{ + this->_reschedule(UDPEvents::UDP_SEND_READY, nullptr, 0); +} + +void +UDP2ConnectionImpl::_close_event(int e) +{ + this->_close_event(static_cast(e)); +} + +void +UDP2ConnectionImpl::_close_event(UDPEvents e) +{ + Event **ptr = nullptr; + switch (e) { + case UDPEvents::UDP_START_EVENT: + ptr = &this->_start_event; + break; + case UDPEvents::UDP_CONNECT_EVENT: + ptr = &this->_connect_event; + break; + case UDPEvents::UDP_USER_READ_READY: + ink_assert(this->_thread == this_ethread()); + this->_user_read_ready_event.cancel(); + return; + case UDPEvents::UDP_SEND_READY: + ink_assert(this->_thread == this_ethread()); + this->_send_ready_event.cancel(); + return; + default: + ink_release_assert(!"unknown ptrs"); + break; + } + + if (*ptr != nullptr) { + (*ptr)->cancel(); + *ptr = nullptr; + } +} + +void +UDP2ConnectionImpl::net_read_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->read; + if (!s->enabled) { + read_disable(nh, this); + return; + } + + this->_read_from_net(nh, thread, true); + + read_reschedule(nh, this); +} + +void +UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callback) +{ + // receive packet and queue onto UDPConnection. + // don't call back connection at this time. + int64_t r = 0; + int count = 0; + + Ptr chain, next_chain; + struct iovec tiovec[MAX_NIOV]; + int64_t size_index = BUFFER_SIZE_INDEX_2K; + int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index); + // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes. + // Because the 'UDP Length' is type of uint16_t defined in RFC 768. + // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. + do { + // create IOBufferBlock chain to receive data + unsigned int niov; + IOBufferBlock *b, *last; + + // build struct iov + // reuse the block in chain if available + b = chain.get(); + last = nullptr; + for (niov = 0; niov < MAX_NIOV; niov++) { + if (b == nullptr) { + b = new_IOBufferBlock(); + b->alloc(size_index); + if (last == nullptr) { + chain = b; + } else { + last->next = b; + } + } + + tiovec[niov].iov_base = b->buf(); + tiovec[niov].iov_len = b->block_size(); + + last = b; + b = b->next.get(); + } + + UDP2PacketUPtr p = std::make_unique(); + r = this->_read(tiovec, niov, p->from, p->to); + if (r <= 0) { + if (r == -EAGAIN || r == -ENOTCONN) { + this->read.triggered = 0; + break; + } + + if (callback) { + this->callback(NET_EVENT_DATAGRAM_READ_ERROR, this); + } + return; + } + + // fill the IOBufferBlock chain + int64_t saved = r; + b = chain.get(); + while (b && saved > 0) { + if (saved > buffer_size) { + b->fill(buffer_size); + saved -= buffer_size; + b = b->next.get(); + } else { + b->fill(saved); + saved = 0; + next_chain = b->next.get(); + b->next = nullptr; + } + } + + p->chain = chain; + std::string str(p->chain->start(), p->chain->read_avail()); + if (str == "helloword1") { + std::cout << "mother fuck" << std::endl; + } + std::cout << "fuck read: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; + + // queue onto the UDPConnection + this->_recv_queue.push(p.get()); + p.release(); + + // reload the unused block + chain = next_chain; + next_chain = nullptr; + count++; + } while (r > 0); + + Debug("udp_con", "read %d packets from net", count); + + if (callback && (!this->_recv_queue.empty() || !this->_recv_list.empty())) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } + return; +} + +int +UDP2ConnectionImpl::_read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to) +{ + ink_release_assert(this->_from.isValid() && this->_to.isValid()); + int rc = socketManager.readv(this->get_fd(), iov, len); + if (rc <= 0) { + return rc; + } + + ats_ip_copy(&from, &this->_to.sa); + ats_ip_copy(&to, &this->_from.sa); + return rc; +} + +void +UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + write_reschedule(nh, this); + return; + } + + MUTEX_TRY_LOCK(lock2, this->nh->mutex, thread); + if (!lock2.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->write; + if (!s->enabled) { + write_disable(nh, this); + return; + } + + SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); + UDP2Packet *p; + Queue st; + while ((p = aq.pop())) { + st.push(p); + } + + while ((p = st.pop())) { + this->_send_list.push_back(UDP2PacketUPtr(p)); + } + + int count = 0; + while (!this->_send_list.empty()) { + auto p = this->_send_list.front().get(); + + int rc = this->_send(p); + if (rc >= 0) { + count++; + std::cout << "fuck send: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; + this->_send_list.pop_front(); + continue; + } + + if (errno == EAGAIN) { + this->write.triggered = 0; + write_reschedule(nh, this); + break; + } else { + this->write.triggered = 0; + this->callback(NET_EVENT_DATAGRAM_WRITE_ERROR, this); + } + } + + if (count > 0) { + this->callback(NET_EVENT_DATAGRAM_WRITE_READY, this); + } + + if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + this->free(nullptr); + } + + return; +} + +int +UDP2ConnectionImpl::_send(UDP2Packet *p) +{ + ink_assert(this->is_connected()); + struct iovec iov[MAX_NIOV]; + int n, iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + iov_len++; + } + + n = socketManager.writev(this->_fd, iov, iov_len); + if (n >= 0) { + return n; + } + + Debug("udp_con", "writev failed: %s", strerror(errno)); + return -errno; +} + +UDP2Packet * +UDP2ConnectionImpl::recv() +{ + // user should call recv immediatly when UDP2Connection callback to + // contiuation. Since the mutex is already grabed from eventsystem or + // NetHandler, we don't need explicit take lock here. + ink_assert(!this->_is_closed()); + ink_assert(this->mutex->thread_holding == this->_thread); + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + UDP2Packet *t; + Queue st; + while ((t = aq.pop())) { + st.push(t); + } + + while ((t = st.pop())) { + this->_recv_list.push_back(UDP2PacketUPtr(t)); + } + + if (this->_recv_list.empty()) { + return nullptr; + } + + auto p = std::move(this->_recv_list.front()); + this->_recv_list.pop_front(); + auto ret = p.get(); + p.release(); + return ret; +} + +void +UDP2ConnectionImpl::_reenable(VIO *vio) +{ + NetState *state = &this->write; + if (vio != &this->write.vio) { + state = &this->read; + } + + Debug("udp_con", "udp connection reenable %s", vio == &this->read.vio ? "read" : "write"); + state->enabled = 1; + ink_release_assert(!closed); + auto t = this_ethread(); + if (nh->mutex->thread_holding == t) { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } else { + MUTEX_TRY_LOCK(lock, nh->mutex, t); + if (!lock.is_locked()) { + if (vio == &read.vio) { + int isin = ink_atomic_swap(&read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(this); + } + } else { + int isin = ink_atomic_swap(&write.in_enabled_list, 1); + if (!isin) { + nh->write_enable_list.push(this); + } + } + if (likely(nh->thread)) { + nh->thread->tail_cb->signalActivity(); + } else if (nh->trigger_event) { + nh->trigger_event->ethread->tail_cb->signalActivity(); + } + } else { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } + } +} + +int +UDP2ConnectionImpl::dispatch(UDP2Packet *packet) +{ + if (this->_is_closed()) { + return 0; + } + + this->_recv_queue.push(packet); + if (this->_thread == this_ethread()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, nullptr); + return 0; + } + + return 0; +} + +int +UDP2ConnectionImpl::send(UDP2Packet *p, bool flush) +{ + ink_assert(!this->_is_closed()); + ink_assert(this->is_connected() || p->to.isValid()); + this->_send_queue.push(p); + if (flush) { + this->_reenable(&this->write.vio); + } + return 0; +} + +void +UDP2ConnectionImpl::flush() +{ + this->_reenable(&this->write.vio); +} + +// +// AcceptUDP2ConnectionImpl +// +UDP2ConnectionImpl * +AcceptUDP2ConnectionImpl::create_sub_connection(const IpEndpoint &local, const IpEndpoint &peer, Continuation *c, EThread *thread) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + ink_assert(peer.isValid()); + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + + auto con = this->_manager.create_udp_connection(c, thread, local, peer); + ink_release_assert(con->is_connected()); + + // since new socket is established, we should drain all udp packet which is already in accept udp + // socket buffer. So read from socket until the EAGIN. + this->_read_from_net(nh, this->_thread, false); + + // in this time every packets read from accept will dispatch to new conn + // So we need to take old packet which already in list. + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + Queue st; + UDP2Packet *p; + while ((p = aq.pop())) { + st.push(p); + } + + while ((p = st.pop())) { + this->_recv_list.push_back(UDP2PacketUPtr(p)); + } + + // unique set list. + // std::set awakenup_list; + int count = 0; + UDP2ConnectionImpl *tmp = nullptr; + for (auto it = this->_recv_list.begin(); it != this->_recv_list.end();) { + if ((tmp = this->_manager.find_connection(&(*it)->to.sa, &(*it)->from.sa))) { + tmp->dispatch((*it).get()); + (*it).release(); + it = this->_recv_list.erase(it); + count++; + // awakenup_list.emplace(tmp); + } else { + it++; + } + } + + // for (auto it : awakenup_list) { + // it->reschedule_read(); + // if (it->nh) { + // it->nh->signalActivity(); + // } + // } + ink_assert(con->start_io() >= 0); + + Debug("udp_accept", "create udp connection %s ----> %s dispatch %d keep packets: %lu", + ats_ip_nptop(&local.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&peer.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN), count, this->_recv_list.size()); + return con; +} + +void +AcceptUDP2ConnectionImpl::net_read_io(NetHandler *nh, EThread *t) +{ + this->_read_from_net(nh, this->_thread, false); + + SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); + Queue st; + UDP2Packet *p; + while ((p = aq.pop())) { + st.push(p); + } + + std::set awakenup_list; + UDP2ConnectionImpl *con = nullptr; + while ((p = st.pop())) { + if ((con = this->_manager.find_connection(&p->to.sa, &p->from.sa))) { + con->dispatch(p); + awakenup_list.emplace(con); + continue; + } + + this->_recv_list.push_back(UDP2PacketUPtr(p)); + } + + for (auto it : awakenup_list) { + it->reschedule_read(); + } + + Debug("udp_accept", "dispatch %lu packets to other connection, manager size: %d", awakenup_list.size(), this->_manager.size()); + if (!this->_recv_queue.empty() || !this->_recv_list.empty()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } +} + +int +AcceptUDP2ConnectionImpl::_send(UDP2Packet *p) +{ + ink_assert(p->to.isValid()); + ink_assert(this->is_connected() == false); + struct msghdr msg; + struct iovec iov[MAX_NIOV]; + int real_len = 0; + int n, iov_len = 0; + +#if !defined(solaris) + msg.msg_control = nullptr; + msg.msg_controllen = 0; + msg.msg_flags = 0; +#endif + msg.msg_name = reinterpret_cast(&p->to.sa); + msg.msg_namelen = ats_ip_size(p->to); + iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + real_len += iov[iov_len].iov_len; + iov_len++; + } + + msg.msg_iov = iov; + msg.msg_iovlen = iov_len; + + n = socketManager.sendmsg(this->get_fd(), &msg, 0); + if (n >= 0) { + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "send packet %s ----> %s", ats_ip_nptop(&p->from.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&p->to.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + return n; + } + + Debug("udp_conn", "send from external thread failed: %d-%s", errno, strerror(errno)); + return -errno; +} + +int +AcceptUDP2ConnectionImpl::_read(struct iovec *iov, int len, IpEndpoint &fromaddr, IpEndpoint &toaddr) +{ + struct msghdr msg; + int toaddr_len = sizeof(toaddr); + char *cbuf[1024]; + msg.msg_name = &fromaddr.sin6; + msg.msg_namelen = sizeof(fromaddr); + msg.msg_iov = iov; + msg.msg_iovlen = len; + msg.msg_control = cbuf; + msg.msg_controllen = sizeof(cbuf); + + int rc = socketManager.recvmsg(this->get_fd(), &msg, 0); + if (rc <= 0) { + return rc; + } + + // truncated check + if (msg.msg_flags & MSG_TRUNC) { + Debug("udp-read", "The UDP packet is truncated"); + ink_assert(!"truncate should not happen, if so please increase MAX_NIOV"); + rc = -0x12345; + return rc; + } + + safe_getsockname(this->get_fd(), &toaddr.sa, &toaddr_len); + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + switch (cmsg->cmsg_type) { +#ifdef IP_PKTINFO + case IP_PKTINFO: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; + } + break; +#endif +#ifdef IP_RECVDSTADDR + case IP_RECVDSTADDR: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_addr *addr = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = addr->s_addr; + } + break; +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too + if (cmsg->cmsg_level == IPPROTO_IPV6) { + struct in6_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + memcpy(toaddr.sin6.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + break; +#endif + } + } + + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "read packet %s ----> %s", ats_ip_nptop(&fromaddr.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&toaddr.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + ink_release_assert(!ats_ip_addr_port_eq(&fromaddr.sa, &toaddr.sa)); + return rc; +} diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h new file mode 100644 index 00000000000..9c0aa98cb2e --- /dev/null +++ b/iocore/net/UDPConnection.h @@ -0,0 +1,167 @@ +/** @file + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "tscore/ink_sock.h" +#include "I_EventSystem.h" +#include "P_Net.h" +#include "NetEvent.h" +#include "UDPPacket.h" +#include "AtomicEvent.h" + +#define NET_EVENT_DATAGRAM_CONNECT_SUCCESS (NET_EVENT_EVENTS_START + 170) +#define NET_EVENT_DATAGRAM_CONNECT_ERROR (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) +#define NET_EVENT_DATAGRAM_WRITE_READY (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) + +class AcceptUDP2ConnectionImpl; +class UDP2ConnectionManager; + +class UDP2Connection : public NetEvent +{ +public: + virtual ~UDP2Connection() {} + virtual int send(UDP2Packet *, bool flush = true) = 0; + virtual UDP2Packet *recv() = 0; + virtual void flush() = 0; + + virtual int close() = 0; + virtual void set_continuation(Continuation *con) = 0; + virtual IpEndpoint from() = 0; + virtual IpEndpoint to() = 0; + + SLINK(UDP2Connection, closed_link); +}; + +class UDP2ConnectionImpl : public UDP2Connection, public Continuation +{ +public: + UDP2ConnectionImpl() = delete; + // independent allocate. + UDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *con, EThread *ethread = nullptr); + ~UDP2ConnectionImpl(); + + enum class UDPEvents : uint8_t { + UDP_START_EVENT, + UDP_CONNECT_EVENT, + UDP_USER_READ_READY, + UDP_SEND_READY, + }; + + // NetEventHandler + virtual void net_read_io(NetHandler *nh, EThread *lthread) override; + void net_write_io(NetHandler *nh, EThread *lthread) override; + void free(EThread *t) override; + int callback(int event = CONTINUATION_EVENT_NONE, void *data = nullptr) override; + void set_inactivity_timeout(ink_hrtime timeout_in) override; + EThread *get_thread() override; + int close() override; + int get_fd() override; + Ptr &get_mutex() override; + ContFlags &get_control_flags() override; + int start_io(); + + // UDP2Connection + int send(UDP2Packet *packet, bool flush = true) override; + void flush() override; + UDP2Packet *recv() override; + IpEndpoint from() override; + IpEndpoint to() override; + void set_continuation(Continuation *con) override; + + int create_socket(sockaddr const *addr, int recv_buf = 0, int send_buf = 0); + int connect(sockaddr const *addr); + bool is_connected() const; + void bind_thread(EThread *thread); + int dispatch(UDP2Packet *packet); + void reschedule_read(); + void reschedule_write(); + + int startEvent(int event, void *data); + int mainEvent(int event, void *data); + +protected: + // control max data size per read, This can be calculated as MAX_NIOV * 1024 / read + static constexpr int MAX_NIOV = 1; + + bool _is_closed() const; + void _reschedule(UDPEvents e, void *data, int64_t delay = 0); + void _reenable(VIO *vio); + void _read_from_net(NetHandler *nh, EThread *t, bool callback = true); + virtual int _send(UDP2Packet *p); + virtual int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); + + ASLL(UDP2Packet, in_link) _recv_queue; + std::deque _recv_list; + + Continuation *_con = nullptr; + EThread *_thread = nullptr; + + UDP2ConnectionManager &_manager; + +private: + ASLL(UDP2Packet, out_link) _send_queue; + + // internal schedule. + void _close_event(UDPEvents e); + void _close_event(int e); + int _connect(); + + IpEndpoint _from{}; + IpEndpoint _to{}; + + int _fd = -1; + bool _connected = false; + Event *_start_event = nullptr; + Event *_connect_event = nullptr; + AtomicEvent _send_ready_event; + AtomicEvent _user_read_ready_event; + + // TODO removed + NetVCOptions _options{}; + ContFlags _cont_flags{}; + + std::deque _send_list; +}; + +// Accept UDP2Connection is ranning in ET_UDP, and dispatch UDPPacket to different sub connections in _connection_map +// So PacketHandler should manager all AcceptUDP2Connection to find the correct connection across diffierent listen +// Addrs. +// FIXME: In current implementable, every AcceptUDP2ConnectionImpl are independent. That means the client should always +// send to the same local addr. +class AcceptUDP2ConnectionImpl : public UDP2ConnectionImpl +{ +public: + AcceptUDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *c, EThread *thread) + : UDP2ConnectionImpl(manager, c, thread) + { + } + + AcceptUDP2ConnectionImpl() = delete; + UDP2ConnectionImpl *create_sub_connection(const IpEndpoint &from, const IpEndpoint &to, Continuation *c, EThread *thread); + + void net_read_io(NetHandler *nh, EThread *lthread) override; + +private: + int _send(UDP2Packet *p) override; + int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to) override; +}; diff --git a/iocore/net/UDPConnectionManager.cc b/iocore/net/UDPConnectionManager.cc new file mode 100644 index 00000000000..82e466d64d5 --- /dev/null +++ b/iocore/net/UDPConnectionManager.cc @@ -0,0 +1,126 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UDPConnectionManager.h" + +UDP2ConnectionImpl * +UDP2ConnectionManager::create_udp_connection(Continuation *c, EThread *ethread, sockaddr const *local, sockaddr const *peer, + int recv_buf, int send_buf) +{ + ink_release_assert(this->mutex->thread_holding == this_ethread()); + auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); + auto it = this->_routes.find(hash); + if (it != this->_routes.end()) { + for (auto itt : it->second) { + auto local = itt->from(); + auto peer = itt->to(); + if (ats_ip_addr_port_eq(&local.sa, local) && ats_ip_addr_port_eq(&peer.sa, peer)) { + return itt; + } + } + } + + // not found + ink_assert(local != nullptr); + ink_assert(peer != nullptr); + auto con = new UDP2ConnectionImpl(*this, c, ethread); + if (con->create_socket(local, recv_buf, send_buf) != 0) { + delete con; + return nullptr; + } + + if (con->connect(peer) < 0) { + delete con; + return nullptr; + } + + ++this->_size; + this->_routes.emplace(hash, std::list(1, con)); + return con; +} + +AcceptUDP2ConnectionImpl * +UDP2ConnectionManager::create_accept_udp_connection(Continuation *c, EThread *thread, sockaddr *local, int recv_buf, int send_buf) +{ + ink_assert(local != nullptr); + auto con = new AcceptUDP2ConnectionImpl(*this, c, thread); + if (con->create_socket(local, recv_buf, send_buf) != 0) { + delete con; + return nullptr; + } + + ink_assert(con->start_io() >= 0); + return con; +} + +UDP2ConnectionImpl * +UDP2ConnectionManager::find_connection(sockaddr const *local, sockaddr const *peer) +{ + auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); + + auto it = this->_routes.find(hash); + if (it != this->_routes.end()) { + for (auto itt : it->second) { + auto local = itt->from(); + auto peer = itt->to(); + if (ats_ip_addr_port_eq(&local.sa, local) && ats_ip_addr_port_eq(&peer.sa, peer)) { + return itt; + } + } + } + return nullptr; +} + +void +UDP2ConnectionManager::close_connection(UDP2Connection *c, const char *line) +{ + this->_closed_queue.push(c); +} + +int +UDP2ConnectionManager::mainEvent(int event, void *data) +{ + // main routine for closed connections cleaning + SList(UDP2Connection, closed_link) aq(this->_closed_queue.popall()); + UDP2Connection *c; + while ((c = aq.pop())) { + auto local = c->from(); + auto peer = c->to(); + auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); + auto it = this->_routes.find(hash); + if (it != this->_routes.end()) { + for (auto itt = it->second.begin(); itt != it->second.end(); ++itt) { + if (*itt == c) { + it->second.erase(itt); + --this->_size; + delete c; + break; + } + } + + if (it->second.empty()) { + this->_routes.erase(it); + } + } + } + + return 0; +} diff --git a/iocore/net/UDPConnectionManager.h b/iocore/net/UDPConnectionManager.h new file mode 100644 index 00000000000..79729f4fee0 --- /dev/null +++ b/iocore/net/UDPConnectionManager.h @@ -0,0 +1,60 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#pragma once + +#include "UDPConnection.h" + +class UDP2ConnectionManager : public Continuation +{ +public: + // create udp connection. + UDP2ConnectionImpl *create_udp_connection(Continuation *con, EThread *ethread, sockaddr const *addr, sockaddr const *peer, + int recv_buf = 0, int send_buf = 0); + + // create an accept udp connection. + AcceptUDP2ConnectionImpl *create_accept_udp_connection(Continuation *c, EThread *thread, sockaddr *local, int recv_buf = 0, + int send_buf = 0); + + // UDP2Connection should removed by UDP2ConnectionManager + // Do not call delete to free UDP2ConnectionImpl + void close_connection(UDP2Connection *c, const char *line); + + int mainEvent(int event, void *data); + + UDP2ConnectionImpl *find_connection(sockaddr const *local, sockaddr const *peer); + + int + size() const + { + return this->_size; + } + + UDP2ConnectionManager(Ptr &mutex) : Continuation(mutex) { SET_HANDLER(&UDP2ConnectionManager::mainEvent); } + UDP2ConnectionManager(ProxyMutex *mutex) : Continuation(mutex) { SET_HANDLER(&UDP2ConnectionManager::mainEvent); } + +private: + // keep the closed connections, and delete it periodicly + ASLL(UDP2Connection, closed_link) _closed_queue; + + // 2-tuple (dest ip and dest port) routes. + std::unordered_map> _routes; + int _size = 0; +}; diff --git a/iocore/net/UDPPacket.h b/iocore/net/UDPPacket.h new file mode 100644 index 00000000000..5c99eaca9fb --- /dev/null +++ b/iocore/net/UDPPacket.h @@ -0,0 +1,52 @@ +/** @file + + ALPNSupport.cc provides implmentations for ALPNSupport methods + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#pragma once + +#include "tscore/ink_sock.h" +#include "tscore/ink_inet.h" +#include "I_EventSystem.h" + +#include + +class UDP2Connection; + +struct UDP2Packet { + UDP2Packet() = default; + UDP2Packet(const IpEndpoint &from, const IpEndpoint &to, Ptr &chain) : from(from), to(to), chain(chain) {} + UDP2Packet(sockaddr const *from, sockaddr *to, Ptr &chain) : chain(chain) + { + ats_ip_copy(&this->from, from); + ats_ip_copy(&this->to, to); + } + + ~UDP2Packet() { this->chain = nullptr; } + IpEndpoint from{}; + IpEndpoint to{}; + Ptr chain; + + SLINK(UDP2Packet, in_link); + SLINK(UDP2Packet, out_link); + LINK(UDP2Packet, link); +}; + +using UDP2PacketUPtr = std::unique_ptr; diff --git a/iocore/net/UDPProcessor.cc b/iocore/net/UDPProcessor.cc new file mode 100644 index 00000000000..617787e16be --- /dev/null +++ b/iocore/net/UDPProcessor.cc @@ -0,0 +1,85 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UDPProcessor.h" +#include "UDPConnectionManager.h" +#include "P_Net.h" + +UDP2NetProcessor udp2Net; +EventType ET_UDP2; + +constexpr static int64_t UDP_MANAGER_PERIODIC = 200 * HRTIME_SECOND; + +void +initialize_thread_for_udp2_net(EThread *thread) +{ + NetHandler *nh = get_NetHandler(thread); + UDP2ConnectionManager *udp2_manager = get_UDP2ConnectionManager(thread); + + new (reinterpret_cast(nh)) NetHandler(); + new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); + new (reinterpret_cast(udp2_manager)) UDP2ConnectionManager(thread->mutex); + nh->mutex = new_ProxyMutex(); + nh->thread = thread; + + PollCont *pc = get_PollCont(thread); + PollDescriptor *pd = pc->pollDescriptor; + + memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config)); + nh->configure_per_thread_values(); + + thread->schedule_every(udp2_manager, UDP_MANAGER_PERIODIC); + + thread->set_tail_handler(nh); + thread->ep = static_cast(ats_malloc(sizeof(EventIO))); + new (thread->ep) EventIO(); + thread->ep->type = EVENTIO_ASYNC_SIGNAL; +#if HAVE_EVENTFD + thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); +#else + thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); +#endif +} + +int +UDP2NetProcessor::start(int n_upd_threads, size_t stacksize) +{ + if (n_upd_threads < 1) { + return -1; + } + + if (unix_netProcessor.pollCont_offset < 0) { + unix_netProcessor.pollCont_offset = eventProcessor.allocate(sizeof(PollCont)); + } + + if (unix_netProcessor.netHandler_offset < 0) { + unix_netProcessor.netHandler_offset = eventProcessor.allocate(sizeof(NetHandler)); + } + + if (this->udp_connection_manager_offset < 0) { + this->udp_connection_manager_offset = eventProcessor.allocate(sizeof(UDP2ConnectionManager)); + } + + ET_UDP2 = eventProcessor.register_event_type("ET_UDP2"); + eventProcessor.schedule_spawn(&initialize_thread_for_udp2_net, ET_UDP2); + eventProcessor.spawn_event_threads(ET_UDP2, n_upd_threads, stacksize); + return 0; +} diff --git a/iocore/net/UDPProcessor.h b/iocore/net/UDPProcessor.h new file mode 100644 index 00000000000..af243e78476 --- /dev/null +++ b/iocore/net/UDPProcessor.h @@ -0,0 +1,43 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_EventSystem.h" + +class UDP2ConnectionManager; + +class UDP2NetProcessor : public Processor +{ +public: + int start(int n_upd_threads, size_t stacksize) override; + + off_t udp_connection_manager_offset = -1; +}; + +extern EventType ET_UDP2; +extern UDP2NetProcessor udp2Net; + +static inline UDP2ConnectionManager * +get_UDP2ConnectionManager(EThread *t) +{ + return static_cast(ETHREAD_GET_PTR(t, udp2Net.udp_connection_manager_offset)); +} diff --git a/iocore/net/libinknet_stub.cc b/iocore/net/libinknet_stub.cc index 11fee253e2f..d55e7051612 100644 --- a/iocore/net/libinknet_stub.cc +++ b/iocore/net/libinknet_stub.cc @@ -47,7 +47,7 @@ DNSConnection::trigger() void StatPagesManager::register_http(char const *, Action *(*)(Continuation *, HTTPHdr *)) { - ink_assert(false); + // ink_assert(false); } #include "ParentSelection.h" diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc new file mode 100644 index 00000000000..10339c893aa --- /dev/null +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -0,0 +1,346 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include +#include +#include + +#include "tscore/I_Layout.h" +#include "tscore/TestBox.h" + +#include "I_EventSystem.h" +#include "I_Net.h" +#include "UDPConnection.h" +#include "UDPProcessor.h" +#include "records/I_RecProcess.h" +#include "RecordsConfig.h" +#include "UDPConnectionManager.h" + +#include "diags.i" + +static pid_t pid; + +void +signal_handler(int signum) +{ + std::exit(EXIT_SUCCESS); +} + +// StatPagesManager statPagesManager; +class CloseCont : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + signal_handler(0); + return 0; + } + + CloseCont() { SET_HANDLER(&CloseCont::mainEvent); } +}; + +static CloseCont close_cont; + +in_port_t port = 0; +int pfd[2]; // Pipe used to signal client with transient port. + +class EchoServer : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_CONNECT_SUCCESS: + this->_con = static_cast(data); + ink_release_assert(this->_con->is_connected()); + std::cout << "connect success" << std::endl; + break; + + case NET_EVENT_DATAGRAM_READ_READY: { + std::cout << "read ready event" << std::endl; + while (true) { + auto p = this->_con->recv(); + if (p == nullptr) { + return 0; + } + ink_release_assert(p != nullptr); + this->_con->send(p); + this->count++; + std::cout << "receive msg from echo: " << std::string(p->chain->start(), p->chain->read_avail()); + std::cout << " then send" << this->count << std::endl; + if (this->count == 2) { + this->_con->close(); + this->_con = nullptr; + this_ethread()->schedule_in(&close_cont, 100 * HRTIME_MSECOND); + return 0; + } + } + } + default: + break; + } + return 0; + } + EchoServer() { SET_HANDLER(&EchoServer::mainEvent); } + +private: + int count = 0; + UDP2ConnectionImpl *_con = nullptr; +}; + +class AcceptServer : public Continuation +{ +public: + int + createEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_WRITE_READY: + return 0; + case EVENT_INTERVAL: + break; + default: + ink_assert(0); + return 0; + } + std::cout << "Accept woke up" << std::endl; + UDP2Packet *p = this->_packet; + auto t = eventProcessor.assign_thread(ET_NET); + ink_assert(this->_sub_con == nullptr); + this->_sub_con = this->_conn->create_sub_connection(this->_conn->from(), p->to, new EchoServer(), t); + // this->_sub_con->dispatch(this->_packet); + return 0; + } + + int + mainEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_READ_READY: { + ink_assert(this->_conn == static_cast(data)); + auto p = this->_conn->recv(); + auto tmp = p->from; + p->from = p->to; + p->to = tmp; + this->_packet = p; + auto new_p = new UDP2Packet; + *new_p = *p; + this->_conn->send(new_p); + + std::cout << "receive msg from accept: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; + // waiting for client send all packet to accept socket's buffer + std::cout << "accept sleep" << std::endl; + SET_HANDLER(&AcceptServer::createEvent); + this_ethread()->schedule_in(this, HRTIME_MSECONDS(1)); + break; + } + case NET_EVENT_DATAGRAM_WRITE_READY: + break; + default: + ink_release_assert(0); + break; + } + return 0; + } + + AcceptServer() + { + SET_HANDLER(&AcceptServer::mainEvent); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + auto udp_manager = get_UDP2ConnectionManager(eventProcessor.assign_thread(ET_UDP2)); + this->_conn = udp_manager->create_accept_udp_connection(this, eventProcessor.assign_thread(ET_UDP2), + reinterpret_cast(&addr)); + ink_release_assert(this->_conn != nullptr); + std::cout << "bind to port: " << ats_ip_port_host_order(this->_conn->from()) << std::endl; + int port = ats_ip_port_host_order(this->_conn->from()); + ink_release_assert(write(pfd[1], &port, sizeof(port)) == sizeof(port)); + this->mutex = this->_conn->mutex; + } + +private: + AcceptUDP2ConnectionImpl *_conn = nullptr; + UDP2ConnectionImpl *_sub_con = nullptr; + UDP2Packet *_packet = nullptr; +}; + +void +udp_client(TestBox &box) +{ + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + std::cout << "Couldn't create socket" << std::endl; + std::exit(EXIT_FAILURE); + } + + const char payload[] = "helloword"; + const char payload1[] = "helloword1"; + const char payload2[] = "helloword2"; + + struct timeval tv; + tv.tv_sec = 20; + tv.tv_usec = 0; + + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof(tv)); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(port); + + auto bsend = [sock, addr](const char *payload) { + ssize_t n = sendto(sock, payload, strlen(payload), 0, + reinterpret_cast(const_cast(&addr)), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + }; + + auto brecv = [sock, box](const char *expect) -> bool { + char buf[128] = {0}; + ssize_t l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + const_cast(&box)->check(false, "errno recv"); + return false; + } + std::cout << "client recv payload: " << buf << std::endl; + const_cast(&box)->check(strncmp(buf, expect, sizeof(payload)) == 0, "echo doesn't match"); + if (strncmp(buf, expect, sizeof(payload))) { + kill(pid, SIGINT); + } + return strncmp(buf, expect, sizeof(payload)) == 0; + }; + +#define CHECK_RECV(statement) \ + do { \ + if (!statement) { \ + return; \ + } \ + } while (0) + + std::cout << "client send payload" << std::endl; + bsend(payload); // send payload to accept; + CHECK_RECV(brecv(payload)); // accept reply the payload + // CHECK_RECV(brecv(payload)); // sub udp connection send another one. + + // send to accept udp connection since we are sleeping in one second. + std::cout << "client send payload1" << std::endl; + bsend(payload1); // send to accept udp connection since we are sleeping in one second. + + std::cout << "client send payload2" << std::endl; + bsend(payload2); // send to accept udp again. + + // recv from sub udp connection + CHECK_RECV(brecv(payload1)); + CHECK_RECV(brecv(payload2)); + + close(sock); + return; +} + +void +udp_echo_server() +{ + Layout::create(); + RecModeT mode_type = RECM_STAND_ALONE; + RecProcessInit(mode_type); + + Thread *main_thread = new EThread(); + main_thread->set_specific(); + net_config_poll_timeout = 10; + RecProcessInit(RECM_STAND_ALONE); + LibRecordsConfigInit(); + ink_net_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); + + // statPagesManager.init(); + init_diags("udp", nullptr); + ink_event_system_init(EVENT_SYSTEM_MODULE_PUBLIC_VERSION); + netProcessor.init(); + eventProcessor.start(1); + udp2Net.start(1, 1048576); + + initialize_thread_for_net(this_ethread()); + + signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, signal_handler); + + AcceptServer *server = new AcceptServer; + (void)server; + + this_thread()->execute(); +} + +REGRESSION_TEST(UDPNet_echo)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus) +{ + TestBox box(t, pstatus); + box = REGRESSION_TEST_PASSED; + + int z = pipe(pfd); + if (z < 0) { + std::cout << "Unable to create pipe" << std::endl; + std::exit(EXIT_FAILURE); + } + + pid = fork(); + if (pid < 0) { + std::cout << "Couldn't fork" << std::endl; + std::exit(EXIT_FAILURE); + } else if (pid == 0) { + close(pfd[0]); + udp_echo_server(); + } else { + close(pfd[1]); + if (read(pfd[0], &port, sizeof(port)) <= 0) { + std::cout << "Failed to get signal with port data [" << errno << ']' << std::endl; + std::exit(EXIT_FAILURE); + } + Debug("udp_echo", "client get ports: %d", port); + udp_client(box); + + // kill(pid, SIGTERM); + int status; + wait(&status); + + if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { + std::cout << "UDP Echo Server exit failure" << std::endl; + std::exit(EXIT_FAILURE); + } + } +} + +int +main(int /* argc ATS_UNUSED */, const char ** /* argv ATS_UNUSED */) +{ + RegressionTest::run("UDPNet", REGRESSION_TEST_QUICK); + return RegressionTest::final_status == REGRESSION_TEST_PASSED ? 0 : 1; +} From f0eb2e584b867762c9af4c8445c9661c6d6df19e Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 20 Jan 2020 09:12:56 +0800 Subject: [PATCH 02/11] simplify udp implementation --- iocore/net/Makefile.am | 3 +- iocore/net/UDPConnection.cc | 419 +++++++++-------------------- iocore/net/UDPConnection.h | 65 +---- iocore/net/UDPConnectionManager.cc | 126 --------- iocore/net/UDPConnectionManager.h | 60 ----- iocore/net/UDPProcessor.cc | 13 +- iocore/net/UDPProcessor.h | 8 - iocore/net/test_UDPAcceptEcho.cc | 137 +++------- 8 files changed, 172 insertions(+), 659 deletions(-) delete mode 100644 iocore/net/UDPConnectionManager.cc delete mode 100644 iocore/net/UDPConnectionManager.h diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index aa4e82dd7cb..23586706373 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -165,8 +165,7 @@ libinknet_a_SOURCES = \ UnixUDPNet.cc \ SSLDynlock.cc \ UDPProcessor.cc \ - UDPConnection.cc \ - UDPConnectionManager.cc + UDPConnection.cc test_UDPAcceptEcho_CPPFLAGS = \ $(AM_CPPFLAGS) \ diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 9723db20ef7..3153aebf2a8 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -22,7 +22,6 @@ #include #include "UDPConnection.h" -#include "UDPConnectionManager.h" #include "tscore/ink_atomic.h" @@ -36,8 +35,6 @@ udp_event_name(UDP2ConnectionImpl::UDPEvents e) return "UDP_CONNECT_EVENT"; case UDP2ConnectionImpl::UDPEvents::UDP_USER_READ_READY: return "UDP_USER_READ_READY"; - case UDP2ConnectionImpl::UDPEvents::UDP_SEND_READY: - return "UDP_SEND_READY"; default: return "UNKNOWN EVENT"; }; @@ -80,8 +77,7 @@ write_reschedule(NetHandler *nh, NetEvent *vc) // // UDP2ConnectionImpl // -UDP2ConnectionImpl::UDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *con, EThread *thread) - : _con(con), _thread(thread), _manager(manager) +UDP2ConnectionImpl::UDP2ConnectionImpl(Continuation *con, EThread *thread) : _con(con), _thread(thread) { this->mutex = con->mutex; this->read.enabled = 1; // read enabled is always true because we expected all data; @@ -112,8 +108,7 @@ UDP2ConnectionImpl::free(EThread *t) Debug("udp_con", "free connection"); this->mutex = nullptr; - this->_user_read_ready_event.close(); - this->_send_ready_event.close(); + this->_close_event(UDPEvents::UDP_USER_READ_READY); this->_close_event(UDPEvents::UDP_START_EVENT); this->_close_event(UDPEvents::UDP_CONNECT_EVENT); @@ -131,7 +126,7 @@ UDP2ConnectionImpl::free(EThread *t) ::close(fd); } - this->_manager.close_connection(this, ""); + delete this; } int @@ -169,13 +164,8 @@ UDP2ConnectionImpl::close() this->_con = nullptr; this->mutex = this->_thread->mutex; - SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); - UDP2Packet *p; - while ((p = aq.pop())) { - delete p; - } - - if (this->_send_list.empty() && this->_send_queue.empty()) { + this->_recv_list.clear(); + if (this->_send_list.empty()) { this->free(nullptr); return 0; } @@ -226,7 +216,7 @@ UDP2ConnectionImpl::startEvent(int event, void *data) SET_HANDLER(&UDP2ConnectionImpl::mainEvent); ink_assert(nh->startIO(this) >= 0); // reenable read since there might be some packets in socket's buffer. - if (!this->_recv_list.empty() || !this->_recv_queue.empty()) { + if (!this->_recv_list.empty()) { this->callback(NET_EVENT_DATAGRAM_READ_READY, this); } break; @@ -240,7 +230,7 @@ UDP2ConnectionImpl::startEvent(int event, void *data) break; } - if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + if (this->_is_closed() && this->_send_list.empty()) { this->free(nullptr); } else if (this->_is_closed()) { this->_reenable(&this->write.vio); @@ -258,16 +248,13 @@ UDP2ConnectionImpl::mainEvent(int event, void *data) case UDPEvents::UDP_CONNECT_EVENT: this->connect(&this->_to.sa); break; - case UDPEvents::UDP_USER_READ_READY: - this->callback(NET_EVENT_DATAGRAM_READ_READY, this); - break; default: Debug("udp_con", "unknown events: %d", event); ink_release_assert(0); break; } - if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + if (this->_is_closed() && this->_send_list.empty()) { this->free(nullptr); } else if (this->_is_closed()) { this->_reenable(&this->write.vio); @@ -463,11 +450,8 @@ UDP2ConnectionImpl::_reschedule(UDPEvents e, void *data, int64_t delay) event = &this->_connect_event; break; case UDPEvents::UDP_USER_READ_READY: - this->_user_read_ready_event.schedule(this, this->_thread, static_cast(e), data, delay); - return; - case UDPEvents::UDP_SEND_READY: - this->_send_ready_event.schedule(this, this->_thread, static_cast(e), data, delay); - return; + event = &this->_user_read_ready_event; + break; default: ink_release_assert(!"unknown events"); break; @@ -485,18 +469,6 @@ UDP2ConnectionImpl::_reschedule(UDPEvents e, void *data, int64_t delay) } } -void -UDP2ConnectionImpl::reschedule_read() -{ - this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr, 0); -} - -void -UDP2ConnectionImpl::reschedule_write() -{ - this->_reschedule(UDPEvents::UDP_SEND_READY, nullptr, 0); -} - void UDP2ConnectionImpl::_close_event(int e) { @@ -515,13 +487,8 @@ UDP2ConnectionImpl::_close_event(UDPEvents e) ptr = &this->_connect_event; break; case UDPEvents::UDP_USER_READ_READY: - ink_assert(this->_thread == this_ethread()); - this->_user_read_ready_event.cancel(); - return; - case UDPEvents::UDP_SEND_READY: - ink_assert(this->_thread == this_ethread()); - this->_send_ready_event.cancel(); - return; + ptr = &this->_user_read_ready_event; + break; default: ink_release_assert(!"unknown ptrs"); break; @@ -598,7 +565,7 @@ UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callbac } UDP2PacketUPtr p = std::make_unique(); - r = this->_read(tiovec, niov, p->from, p->to); + r = this->is_connected() ? this->_read(tiovec, niov, p->from, p->to) : this->_readmsg(tiovec, niov, p->from, p->to); if (r <= 0) { if (r == -EAGAIN || r == -ENOTCONN) { this->read.triggered = 0; @@ -635,8 +602,7 @@ UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callbac std::cout << "fuck read: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; // queue onto the UDPConnection - this->_recv_queue.push(p.get()); - p.release(); + this->_recv_list.push_back(std::move(p)); // reload the unused block chain = next_chain; @@ -646,12 +612,75 @@ UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callbac Debug("udp_con", "read %d packets from net", count); - if (callback && (!this->_recv_queue.empty() || !this->_recv_list.empty())) { + if (callback && !this->_recv_list.empty()) { this->callback(NET_EVENT_DATAGRAM_READ_READY, this); } return; } +int +UDP2ConnectionImpl::_readmsg(struct iovec *iov, int len, IpEndpoint &fromaddr, IpEndpoint &toaddr) +{ + struct msghdr msg; + int toaddr_len = sizeof(toaddr); + char *cbuf[1024]; + msg.msg_name = &fromaddr.sin6; + msg.msg_namelen = sizeof(fromaddr); + msg.msg_iov = iov; + msg.msg_iovlen = len; + msg.msg_control = cbuf; + msg.msg_controllen = sizeof(cbuf); + + int rc = socketManager.recvmsg(this->get_fd(), &msg, 0); + if (rc <= 0) { + return rc; + } + + // truncated check + if (msg.msg_flags & MSG_TRUNC) { + Debug("udp-read", "The UDP packet is truncated"); + ink_assert(!"truncate should not happen, if so please increase MAX_NIOV"); + rc = -0x12345; + return rc; + } + + safe_getsockname(this->get_fd(), &toaddr.sa, &toaddr_len); + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + switch (cmsg->cmsg_type) { +#ifdef IP_PKTINFO + case IP_PKTINFO: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; + } + break; +#endif +#ifdef IP_RECVDSTADDR + case IP_RECVDSTADDR: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_addr *addr = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = addr->s_addr; + } + break; +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too + if (cmsg->cmsg_level == IPPROTO_IPV6) { + struct in6_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + memcpy(toaddr.sin6.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + break; +#endif + } + } + + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "read packet %s ----> %s", ats_ip_nptop(&fromaddr.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&toaddr.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + ink_release_assert(!ats_ip_addr_port_eq(&fromaddr.sa, &toaddr.sa)); + return rc; +} + int UDP2ConnectionImpl::_read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to) { @@ -689,22 +718,11 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) return; } - SList(UDP2Packet, out_link) aq(this->_send_queue.popall()); - UDP2Packet *p; - Queue st; - while ((p = aq.pop())) { - st.push(p); - } - - while ((p = st.pop())) { - this->_send_list.push_back(UDP2PacketUPtr(p)); - } - int count = 0; while (!this->_send_list.empty()) { auto p = this->_send_list.front().get(); - int rc = this->_send(p); + int rc = this->is_connected() ? this->_send(p) : this->_sendmsg(p); if (rc >= 0) { count++; std::cout << "fuck send: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; @@ -726,7 +744,7 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) this->callback(NET_EVENT_DATAGRAM_WRITE_READY, this); } - if (this->_is_closed() && (this->_send_queue.empty() && this->_send_list.empty())) { + if (this->_is_closed() && this->_send_list.empty()) { this->free(nullptr); } @@ -755,34 +773,59 @@ UDP2ConnectionImpl::_send(UDP2Packet *p) return -errno; } -UDP2Packet * -UDP2ConnectionImpl::recv() +int +UDP2ConnectionImpl::_sendmsg(UDP2Packet *p) { - // user should call recv immediatly when UDP2Connection callback to - // contiuation. Since the mutex is already grabed from eventsystem or - // NetHandler, we don't need explicit take lock here. - ink_assert(!this->_is_closed()); - ink_assert(this->mutex->thread_holding == this->_thread); - SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); - UDP2Packet *t; - Queue st; - while ((t = aq.pop())) { - st.push(t); + ink_assert(p->to.isValid()); + ink_assert(this->is_connected() == false); + struct msghdr msg; + struct iovec iov[MAX_NIOV]; + int real_len = 0; + int n, iov_len = 0; + +#if !defined(solaris) + msg.msg_control = nullptr; + msg.msg_controllen = 0; + msg.msg_flags = 0; +#endif + msg.msg_name = reinterpret_cast(&p->to.sa); + msg.msg_namelen = ats_ip_size(p->to); + iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + real_len += iov[iov_len].iov_len; + iov_len++; } - while ((t = st.pop())) { - this->_recv_list.push_back(UDP2PacketUPtr(t)); + msg.msg_iov = iov; + msg.msg_iovlen = iov_len; + + n = socketManager.sendmsg(this->get_fd(), &msg, 0); + if (n >= 0) { + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "send packet %s ----> %s", ats_ip_nptop(&p->from.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&p->to.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + return n; } + Debug("udp_conn", "send from external thread failed: %d-%s", errno, strerror(errno)); + return -errno; +} + +UDP2PacketUPtr +UDP2ConnectionImpl::recv() +{ + ink_assert(!this->_is_closed()); + ink_assert(this->mutex->thread_holding == this->_thread); if (this->_recv_list.empty()) { return nullptr; } auto p = std::move(this->_recv_list.front()); this->_recv_list.pop_front(); - auto ret = p.get(); - p.release(); - return ret; + return p; } void @@ -857,27 +900,11 @@ UDP2ConnectionImpl::_reenable(VIO *vio) } int -UDP2ConnectionImpl::dispatch(UDP2Packet *packet) -{ - if (this->_is_closed()) { - return 0; - } - - this->_recv_queue.push(packet); - if (this->_thread == this_ethread()) { - this->callback(NET_EVENT_DATAGRAM_READ_READY, nullptr); - return 0; - } - - return 0; -} - -int -UDP2ConnectionImpl::send(UDP2Packet *p, bool flush) +UDP2ConnectionImpl::send(UDP2PacketUPtr p, bool flush) { ink_assert(!this->_is_closed()); ink_assert(this->is_connected() || p->to.isValid()); - this->_send_queue.push(p); + this->_send_list.push_back(std::move(p)); if (flush) { this->_reenable(&this->write.vio); } @@ -889,201 +916,3 @@ UDP2ConnectionImpl::flush() { this->_reenable(&this->write.vio); } - -// -// AcceptUDP2ConnectionImpl -// -UDP2ConnectionImpl * -AcceptUDP2ConnectionImpl::create_sub_connection(const IpEndpoint &local, const IpEndpoint &peer, Continuation *c, EThread *thread) -{ - ink_assert(this->mutex->thread_holding == this->_thread); - ink_assert(peer.isValid()); - char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; - - auto con = this->_manager.create_udp_connection(c, thread, local, peer); - ink_release_assert(con->is_connected()); - - // since new socket is established, we should drain all udp packet which is already in accept udp - // socket buffer. So read from socket until the EAGIN. - this->_read_from_net(nh, this->_thread, false); - - // in this time every packets read from accept will dispatch to new conn - // So we need to take old packet which already in list. - SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); - Queue st; - UDP2Packet *p; - while ((p = aq.pop())) { - st.push(p); - } - - while ((p = st.pop())) { - this->_recv_list.push_back(UDP2PacketUPtr(p)); - } - - // unique set list. - // std::set awakenup_list; - int count = 0; - UDP2ConnectionImpl *tmp = nullptr; - for (auto it = this->_recv_list.begin(); it != this->_recv_list.end();) { - if ((tmp = this->_manager.find_connection(&(*it)->to.sa, &(*it)->from.sa))) { - tmp->dispatch((*it).get()); - (*it).release(); - it = this->_recv_list.erase(it); - count++; - // awakenup_list.emplace(tmp); - } else { - it++; - } - } - - // for (auto it : awakenup_list) { - // it->reschedule_read(); - // if (it->nh) { - // it->nh->signalActivity(); - // } - // } - ink_assert(con->start_io() >= 0); - - Debug("udp_accept", "create udp connection %s ----> %s dispatch %d keep packets: %lu", - ats_ip_nptop(&local.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), - ats_ip_nptop(&peer.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN), count, this->_recv_list.size()); - return con; -} - -void -AcceptUDP2ConnectionImpl::net_read_io(NetHandler *nh, EThread *t) -{ - this->_read_from_net(nh, this->_thread, false); - - SList(UDP2Packet, in_link) aq(this->_recv_queue.popall()); - Queue st; - UDP2Packet *p; - while ((p = aq.pop())) { - st.push(p); - } - - std::set awakenup_list; - UDP2ConnectionImpl *con = nullptr; - while ((p = st.pop())) { - if ((con = this->_manager.find_connection(&p->to.sa, &p->from.sa))) { - con->dispatch(p); - awakenup_list.emplace(con); - continue; - } - - this->_recv_list.push_back(UDP2PacketUPtr(p)); - } - - for (auto it : awakenup_list) { - it->reschedule_read(); - } - - Debug("udp_accept", "dispatch %lu packets to other connection, manager size: %d", awakenup_list.size(), this->_manager.size()); - if (!this->_recv_queue.empty() || !this->_recv_list.empty()) { - this->callback(NET_EVENT_DATAGRAM_READ_READY, this); - } -} - -int -AcceptUDP2ConnectionImpl::_send(UDP2Packet *p) -{ - ink_assert(p->to.isValid()); - ink_assert(this->is_connected() == false); - struct msghdr msg; - struct iovec iov[MAX_NIOV]; - int real_len = 0; - int n, iov_len = 0; - -#if !defined(solaris) - msg.msg_control = nullptr; - msg.msg_controllen = 0; - msg.msg_flags = 0; -#endif - msg.msg_name = reinterpret_cast(&p->to.sa); - msg.msg_namelen = ats_ip_size(p->to); - iov_len = 0; - - for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { - iov[iov_len].iov_base = static_cast(b->start()); - iov[iov_len].iov_len = b->size(); - real_len += iov[iov_len].iov_len; - iov_len++; - } - - msg.msg_iov = iov; - msg.msg_iovlen = iov_len; - - n = socketManager.sendmsg(this->get_fd(), &msg, 0); - if (n >= 0) { - char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; - Debug("udp_accept", "send packet %s ----> %s", ats_ip_nptop(&p->from.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), - ats_ip_nptop(&p->to.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); - return n; - } - - Debug("udp_conn", "send from external thread failed: %d-%s", errno, strerror(errno)); - return -errno; -} - -int -AcceptUDP2ConnectionImpl::_read(struct iovec *iov, int len, IpEndpoint &fromaddr, IpEndpoint &toaddr) -{ - struct msghdr msg; - int toaddr_len = sizeof(toaddr); - char *cbuf[1024]; - msg.msg_name = &fromaddr.sin6; - msg.msg_namelen = sizeof(fromaddr); - msg.msg_iov = iov; - msg.msg_iovlen = len; - msg.msg_control = cbuf; - msg.msg_controllen = sizeof(cbuf); - - int rc = socketManager.recvmsg(this->get_fd(), &msg, 0); - if (rc <= 0) { - return rc; - } - - // truncated check - if (msg.msg_flags & MSG_TRUNC) { - Debug("udp-read", "The UDP packet is truncated"); - ink_assert(!"truncate should not happen, if so please increase MAX_NIOV"); - rc = -0x12345; - return rc; - } - - safe_getsockname(this->get_fd(), &toaddr.sa, &toaddr_len); - for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - switch (cmsg->cmsg_type) { -#ifdef IP_PKTINFO - case IP_PKTINFO: - if (cmsg->cmsg_level == IPPROTO_IP) { - struct in_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); - reinterpret_cast(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; - } - break; -#endif -#ifdef IP_RECVDSTADDR - case IP_RECVDSTADDR: - if (cmsg->cmsg_level == IPPROTO_IP) { - struct in_addr *addr = reinterpret_cast(CMSG_DATA(cmsg)); - reinterpret_cast(&toaddr)->sin_addr.s_addr = addr->s_addr; - } - break; -#endif -#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) - case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too - if (cmsg->cmsg_level == IPPROTO_IPV6) { - struct in6_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); - memcpy(toaddr.sin6.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); - } - break; -#endif - } - } - - char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; - Debug("udp_accept", "read packet %s ----> %s", ats_ip_nptop(&fromaddr.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), - ats_ip_nptop(&toaddr.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); - ink_release_assert(!ats_ip_addr_port_eq(&fromaddr.sa, &toaddr.sa)); - return rc; -} diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index 9c0aa98cb2e..edc77f5b2d6 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -33,23 +33,18 @@ #define NET_EVENT_DATAGRAM_CONNECT_ERROR (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) #define NET_EVENT_DATAGRAM_WRITE_READY (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) -class AcceptUDP2ConnectionImpl; -class UDP2ConnectionManager; - class UDP2Connection : public NetEvent { public: virtual ~UDP2Connection() {} - virtual int send(UDP2Packet *, bool flush = true) = 0; - virtual UDP2Packet *recv() = 0; - virtual void flush() = 0; + virtual int send(UDP2PacketUPtr p, bool flush = true) = 0; + virtual UDP2PacketUPtr recv() = 0; + virtual void flush() = 0; virtual int close() = 0; virtual void set_continuation(Continuation *con) = 0; virtual IpEndpoint from() = 0; virtual IpEndpoint to() = 0; - - SLINK(UDP2Connection, closed_link); }; class UDP2ConnectionImpl : public UDP2Connection, public Continuation @@ -57,14 +52,13 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation public: UDP2ConnectionImpl() = delete; // independent allocate. - UDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *con, EThread *ethread = nullptr); + UDP2ConnectionImpl(Continuation *con, EThread *ethread = nullptr); ~UDP2ConnectionImpl(); enum class UDPEvents : uint8_t { UDP_START_EVENT, UDP_CONNECT_EVENT, UDP_USER_READ_READY, - UDP_SEND_READY, }; // NetEventHandler @@ -81,9 +75,9 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation int start_io(); // UDP2Connection - int send(UDP2Packet *packet, bool flush = true) override; + int send(UDP2PacketUPtr packet, bool flush = true) override; void flush() override; - UDP2Packet *recv() override; + UDP2PacketUPtr recv() override; IpEndpoint from() override; IpEndpoint to() override; void set_continuation(Continuation *con) override; @@ -92,9 +86,6 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation int connect(sockaddr const *addr); bool is_connected() const; void bind_thread(EThread *thread); - int dispatch(UDP2Packet *packet); - void reschedule_read(); - void reschedule_write(); int startEvent(int event, void *data); int mainEvent(int event, void *data); @@ -108,19 +99,14 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation void _reenable(VIO *vio); void _read_from_net(NetHandler *nh, EThread *t, bool callback = true); virtual int _send(UDP2Packet *p); + virtual int _sendmsg(UDP2Packet *p); virtual int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); - - ASLL(UDP2Packet, in_link) _recv_queue; - std::deque _recv_list; + virtual int _readmsg(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); Continuation *_con = nullptr; EThread *_thread = nullptr; - UDP2ConnectionManager &_manager; - private: - ASLL(UDP2Packet, out_link) _send_queue; - // internal schedule. void _close_event(UDPEvents e); void _close_event(int e); @@ -129,39 +115,16 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation IpEndpoint _from{}; IpEndpoint _to{}; - int _fd = -1; - bool _connected = false; - Event *_start_event = nullptr; - Event *_connect_event = nullptr; - AtomicEvent _send_ready_event; - AtomicEvent _user_read_ready_event; + int _fd = -1; + bool _connected = false; + Event *_start_event = nullptr; + Event *_connect_event = nullptr; + Event *_user_read_ready_event = nullptr; // TODO removed NetVCOptions _options{}; ContFlags _cont_flags{}; + std::deque _recv_list; std::deque _send_list; }; - -// Accept UDP2Connection is ranning in ET_UDP, and dispatch UDPPacket to different sub connections in _connection_map -// So PacketHandler should manager all AcceptUDP2Connection to find the correct connection across diffierent listen -// Addrs. -// FIXME: In current implementable, every AcceptUDP2ConnectionImpl are independent. That means the client should always -// send to the same local addr. -class AcceptUDP2ConnectionImpl : public UDP2ConnectionImpl -{ -public: - AcceptUDP2ConnectionImpl(UDP2ConnectionManager &manager, Continuation *c, EThread *thread) - : UDP2ConnectionImpl(manager, c, thread) - { - } - - AcceptUDP2ConnectionImpl() = delete; - UDP2ConnectionImpl *create_sub_connection(const IpEndpoint &from, const IpEndpoint &to, Continuation *c, EThread *thread); - - void net_read_io(NetHandler *nh, EThread *lthread) override; - -private: - int _send(UDP2Packet *p) override; - int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to) override; -}; diff --git a/iocore/net/UDPConnectionManager.cc b/iocore/net/UDPConnectionManager.cc deleted file mode 100644 index 82e466d64d5..00000000000 --- a/iocore/net/UDPConnectionManager.cc +++ /dev/null @@ -1,126 +0,0 @@ -/** @file - - @section license License - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -#include "UDPConnectionManager.h" - -UDP2ConnectionImpl * -UDP2ConnectionManager::create_udp_connection(Continuation *c, EThread *ethread, sockaddr const *local, sockaddr const *peer, - int recv_buf, int send_buf) -{ - ink_release_assert(this->mutex->thread_holding == this_ethread()); - auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); - auto it = this->_routes.find(hash); - if (it != this->_routes.end()) { - for (auto itt : it->second) { - auto local = itt->from(); - auto peer = itt->to(); - if (ats_ip_addr_port_eq(&local.sa, local) && ats_ip_addr_port_eq(&peer.sa, peer)) { - return itt; - } - } - } - - // not found - ink_assert(local != nullptr); - ink_assert(peer != nullptr); - auto con = new UDP2ConnectionImpl(*this, c, ethread); - if (con->create_socket(local, recv_buf, send_buf) != 0) { - delete con; - return nullptr; - } - - if (con->connect(peer) < 0) { - delete con; - return nullptr; - } - - ++this->_size; - this->_routes.emplace(hash, std::list(1, con)); - return con; -} - -AcceptUDP2ConnectionImpl * -UDP2ConnectionManager::create_accept_udp_connection(Continuation *c, EThread *thread, sockaddr *local, int recv_buf, int send_buf) -{ - ink_assert(local != nullptr); - auto con = new AcceptUDP2ConnectionImpl(*this, c, thread); - if (con->create_socket(local, recv_buf, send_buf) != 0) { - delete con; - return nullptr; - } - - ink_assert(con->start_io() >= 0); - return con; -} - -UDP2ConnectionImpl * -UDP2ConnectionManager::find_connection(sockaddr const *local, sockaddr const *peer) -{ - auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); - - auto it = this->_routes.find(hash); - if (it != this->_routes.end()) { - for (auto itt : it->second) { - auto local = itt->from(); - auto peer = itt->to(); - if (ats_ip_addr_port_eq(&local.sa, local) && ats_ip_addr_port_eq(&peer.sa, peer)) { - return itt; - } - } - } - return nullptr; -} - -void -UDP2ConnectionManager::close_connection(UDP2Connection *c, const char *line) -{ - this->_closed_queue.push(c); -} - -int -UDP2ConnectionManager::mainEvent(int event, void *data) -{ - // main routine for closed connections cleaning - SList(UDP2Connection, closed_link) aq(this->_closed_queue.popall()); - UDP2Connection *c; - while ((c = aq.pop())) { - auto local = c->from(); - auto peer = c->to(); - auto hash = ats_ip_port_hash(local) ^ ats_ip_port_hash(peer); - auto it = this->_routes.find(hash); - if (it != this->_routes.end()) { - for (auto itt = it->second.begin(); itt != it->second.end(); ++itt) { - if (*itt == c) { - it->second.erase(itt); - --this->_size; - delete c; - break; - } - } - - if (it->second.empty()) { - this->_routes.erase(it); - } - } - } - - return 0; -} diff --git a/iocore/net/UDPConnectionManager.h b/iocore/net/UDPConnectionManager.h deleted file mode 100644 index 79729f4fee0..00000000000 --- a/iocore/net/UDPConnectionManager.h +++ /dev/null @@ -1,60 +0,0 @@ -/** @file - - @section license License - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -#pragma once - -#include "UDPConnection.h" - -class UDP2ConnectionManager : public Continuation -{ -public: - // create udp connection. - UDP2ConnectionImpl *create_udp_connection(Continuation *con, EThread *ethread, sockaddr const *addr, sockaddr const *peer, - int recv_buf = 0, int send_buf = 0); - - // create an accept udp connection. - AcceptUDP2ConnectionImpl *create_accept_udp_connection(Continuation *c, EThread *thread, sockaddr *local, int recv_buf = 0, - int send_buf = 0); - - // UDP2Connection should removed by UDP2ConnectionManager - // Do not call delete to free UDP2ConnectionImpl - void close_connection(UDP2Connection *c, const char *line); - - int mainEvent(int event, void *data); - - UDP2ConnectionImpl *find_connection(sockaddr const *local, sockaddr const *peer); - - int - size() const - { - return this->_size; - } - - UDP2ConnectionManager(Ptr &mutex) : Continuation(mutex) { SET_HANDLER(&UDP2ConnectionManager::mainEvent); } - UDP2ConnectionManager(ProxyMutex *mutex) : Continuation(mutex) { SET_HANDLER(&UDP2ConnectionManager::mainEvent); } - -private: - // keep the closed connections, and delete it periodicly - ASLL(UDP2Connection, closed_link) _closed_queue; - - // 2-tuple (dest ip and dest port) routes. - std::unordered_map> _routes; - int _size = 0; -}; diff --git a/iocore/net/UDPProcessor.cc b/iocore/net/UDPProcessor.cc index 617787e16be..1f50141c5f9 100644 --- a/iocore/net/UDPProcessor.cc +++ b/iocore/net/UDPProcessor.cc @@ -20,23 +20,18 @@ */ #include "UDPProcessor.h" -#include "UDPConnectionManager.h" #include "P_Net.h" UDP2NetProcessor udp2Net; EventType ET_UDP2; -constexpr static int64_t UDP_MANAGER_PERIODIC = 200 * HRTIME_SECOND; - void initialize_thread_for_udp2_net(EThread *thread) { - NetHandler *nh = get_NetHandler(thread); - UDP2ConnectionManager *udp2_manager = get_UDP2ConnectionManager(thread); + NetHandler *nh = get_NetHandler(thread); new (reinterpret_cast(nh)) NetHandler(); new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); - new (reinterpret_cast(udp2_manager)) UDP2ConnectionManager(thread->mutex); nh->mutex = new_ProxyMutex(); nh->thread = thread; @@ -46,8 +41,6 @@ initialize_thread_for_udp2_net(EThread *thread) memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config)); nh->configure_per_thread_values(); - thread->schedule_every(udp2_manager, UDP_MANAGER_PERIODIC); - thread->set_tail_handler(nh); thread->ep = static_cast(ats_malloc(sizeof(EventIO))); new (thread->ep) EventIO(); @@ -74,10 +67,6 @@ UDP2NetProcessor::start(int n_upd_threads, size_t stacksize) unix_netProcessor.netHandler_offset = eventProcessor.allocate(sizeof(NetHandler)); } - if (this->udp_connection_manager_offset < 0) { - this->udp_connection_manager_offset = eventProcessor.allocate(sizeof(UDP2ConnectionManager)); - } - ET_UDP2 = eventProcessor.register_event_type("ET_UDP2"); eventProcessor.schedule_spawn(&initialize_thread_for_udp2_net, ET_UDP2); eventProcessor.spawn_event_threads(ET_UDP2, n_upd_threads, stacksize); diff --git a/iocore/net/UDPProcessor.h b/iocore/net/UDPProcessor.h index af243e78476..074acee98f9 100644 --- a/iocore/net/UDPProcessor.h +++ b/iocore/net/UDPProcessor.h @@ -29,15 +29,7 @@ class UDP2NetProcessor : public Processor { public: int start(int n_upd_threads, size_t stacksize) override; - - off_t udp_connection_manager_offset = -1; }; extern EventType ET_UDP2; extern UDP2NetProcessor udp2Net; - -static inline UDP2ConnectionManager * -get_UDP2ConnectionManager(EThread *t) -{ - return static_cast(ETHREAD_GET_PTR(t, udp2Net.udp_connection_manager_offset)); -} diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc index 10339c893aa..839a607461d 100644 --- a/iocore/net/test_UDPAcceptEcho.cc +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -34,11 +34,13 @@ #include "UDPProcessor.h" #include "records/I_RecProcess.h" #include "RecordsConfig.h" -#include "UDPConnectionManager.h" #include "diags.i" static pid_t pid; +const char payload[] = "helloword"; +const char payload1[] = "helloword1"; +const char payload2[] = "helloword2"; void signal_handler(int signum) @@ -46,117 +48,45 @@ signal_handler(int signum) std::exit(EXIT_SUCCESS); } -// StatPagesManager statPagesManager; -class CloseCont : public Continuation -{ -public: - int - mainEvent(int event, void *data) - { - signal_handler(0); - return 0; - } - - CloseCont() { SET_HANDLER(&CloseCont::mainEvent); } -}; - -static CloseCont close_cont; - in_port_t port = 0; int pfd[2]; // Pipe used to signal client with transient port. -class EchoServer : public Continuation +class AcceptServer : public Continuation { public: int mainEvent(int event, void *data) { switch (event) { - case NET_EVENT_DATAGRAM_CONNECT_SUCCESS: - this->_con = static_cast(data); - ink_release_assert(this->_con->is_connected()); - std::cout << "connect success" << std::endl; - break; - case NET_EVENT_DATAGRAM_READ_READY: { - std::cout << "read ready event" << std::endl; + ink_assert(this->_con == static_cast(data)); while (true) { auto p = this->_con->recv(); if (p == nullptr) { return 0; } - ink_release_assert(p != nullptr); - this->_con->send(p); - this->count++; - std::cout << "receive msg from echo: " << std::string(p->chain->start(), p->chain->read_avail()); - std::cout << " then send" << this->count << std::endl; - if (this->count == 2) { - this->_con->close(); - this->_con = nullptr; - this_ethread()->schedule_in(&close_cont, 100 * HRTIME_MSECOND); - return 0; + + if (!this->_first) { + this->_first = true; + ink_release_assert(this->_con->connect(&p->from.sa) >= 0); } + + this->_closed = std::string(p->chain->start(), p->chain->read_avail()) == payload2; + std::cout << "receive msg from accept: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; + auto tmp = p->from; + p->from = p->to; + p->to = tmp; + this->_con->send(std::move(p)); } - } - default: break; } - return 0; - } - EchoServer() { SET_HANDLER(&EchoServer::mainEvent); } - -private: - int count = 0; - UDP2ConnectionImpl *_con = nullptr; -}; - -class AcceptServer : public Continuation -{ -public: - int - createEvent(int event, void *data) - { - switch (event) { case NET_EVENT_DATAGRAM_WRITE_READY: - return 0; - case EVENT_INTERVAL: - break; - default: - ink_assert(0); - return 0; - } - std::cout << "Accept woke up" << std::endl; - UDP2Packet *p = this->_packet; - auto t = eventProcessor.assign_thread(ET_NET); - ink_assert(this->_sub_con == nullptr); - this->_sub_con = this->_conn->create_sub_connection(this->_conn->from(), p->to, new EchoServer(), t); - // this->_sub_con->dispatch(this->_packet); - return 0; - } - - int - mainEvent(int event, void *data) - { - switch (event) { - case NET_EVENT_DATAGRAM_READ_READY: { - ink_assert(this->_conn == static_cast(data)); - auto p = this->_conn->recv(); - auto tmp = p->from; - p->from = p->to; - p->to = tmp; - this->_packet = p; - auto new_p = new UDP2Packet; - *new_p = *p; - this->_conn->send(new_p); - - std::cout << "receive msg from accept: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; - // waiting for client send all packet to accept socket's buffer - std::cout << "accept sleep" << std::endl; - SET_HANDLER(&AcceptServer::createEvent); - this_ethread()->schedule_in(this, HRTIME_MSECONDS(1)); + if (this->_closed) { + std::cout << "accept exit" << std::endl; + signal_handler(0); + } break; - } - case NET_EVENT_DATAGRAM_WRITE_READY: + case NET_EVENT_DATAGRAM_CONNECT_SUCCESS: break; default: ink_release_assert(0); @@ -173,20 +103,20 @@ class AcceptServer : public Continuation addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_port = 0; - auto udp_manager = get_UDP2ConnectionManager(eventProcessor.assign_thread(ET_UDP2)); - this->_conn = udp_manager->create_accept_udp_connection(this, eventProcessor.assign_thread(ET_UDP2), - reinterpret_cast(&addr)); - ink_release_assert(this->_conn != nullptr); - std::cout << "bind to port: " << ats_ip_port_host_order(this->_conn->from()) << std::endl; - int port = ats_ip_port_host_order(this->_conn->from()); + this->_con = new UDP2ConnectionImpl(this, eventProcessor.assign_thread(ET_UDP2)); + ink_release_assert(this->_con->create_socket(reinterpret_cast(&addr)) >= 0); + ink_release_assert(this->_con->start_io() >= 0); + ink_release_assert(this->_con != nullptr); + std::cout << "bind to port: " << ats_ip_port_host_order(this->_con->from()) << std::endl; + int port = ats_ip_port_host_order(this->_con->from()); ink_release_assert(write(pfd[1], &port, sizeof(port)) == sizeof(port)); - this->mutex = this->_conn->mutex; + this->mutex = this->_con->mutex; } private: - AcceptUDP2ConnectionImpl *_conn = nullptr; - UDP2ConnectionImpl *_sub_con = nullptr; - UDP2Packet *_packet = nullptr; + UDP2ConnectionImpl *_con = nullptr; + bool _first = false; + bool _closed = false; }; void @@ -198,10 +128,6 @@ udp_client(TestBox &box) std::exit(EXIT_FAILURE); } - const char payload[] = "helloword"; - const char payload1[] = "helloword1"; - const char payload2[] = "helloword2"; - struct timeval tv; tv.tv_sec = 20; tv.tv_usec = 0; @@ -264,6 +190,7 @@ udp_client(TestBox &box) CHECK_RECV(brecv(payload1)); CHECK_RECV(brecv(payload2)); + std::cout << "client exit" << std::endl; close(sock); return; } From f4c2b920e7bf3c24bf52e75db8ec5919ab9f10ff Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 20 Jan 2020 10:06:40 +0800 Subject: [PATCH 03/11] flush udp connection --- iocore/net/UDPConnection.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 3153aebf2a8..afad326294e 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -906,7 +906,7 @@ UDP2ConnectionImpl::send(UDP2PacketUPtr p, bool flush) ink_assert(this->is_connected() || p->to.isValid()); this->_send_list.push_back(std::move(p)); if (flush) { - this->_reenable(&this->write.vio); + this->flush(); } return 0; } @@ -915,4 +915,5 @@ void UDP2ConnectionImpl::flush() { this->_reenable(&this->write.vio); + this->nh->signalActivity(); } From 9d3fcb8f03e9d959dd64a3af499e8cf8fef9cac7 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 20 Jan 2020 10:18:29 +0800 Subject: [PATCH 04/11] make udp send thread safe --- iocore/net/UDPConnection.cc | 14 +++++++++++++- iocore/net/UDPConnection.h | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index afad326294e..21fd21a2d7e 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -718,6 +718,17 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) return; } + SList(UDP2Packet, link) aq(this->_external_send_list.popall()); + UDP2Packet *tp; + Queue tmp; + while ((tp = aq.pop())) { + tmp.push(tp); + } + + while ((tp = tmp.pop())) { + this->_send_list.push_back(UDP2PacketUPtr(tp)); + } + int count = 0; while (!this->_send_list.empty()) { auto p = this->_send_list.front().get(); @@ -904,7 +915,8 @@ UDP2ConnectionImpl::send(UDP2PacketUPtr p, bool flush) { ink_assert(!this->_is_closed()); ink_assert(this->is_connected() || p->to.isValid()); - this->_send_list.push_back(std::move(p)); + this->_external_send_list.push(p.get()); + p.release(); if (flush) { this->flush(); } diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index edc77f5b2d6..e9f5b53f748 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -106,6 +106,8 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation Continuation *_con = nullptr; EThread *_thread = nullptr; + ASLL(UDP2Packet, link) _external_send_list; + private: // internal schedule. void _close_event(UDPEvents e); From 41a7a7d84f5ff795912c1d186c51a369b9cc46ae Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 20 Jan 2020 10:24:07 +0800 Subject: [PATCH 05/11] make udp send thread safe --- iocore/net/UDPConnection.cc | 14 ++++++++++---- iocore/net/UDPConnection.h | 1 + 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 21fd21a2d7e..5531a09f219 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -165,7 +165,7 @@ UDP2ConnectionImpl::close() this->mutex = this->_thread->mutex; this->_recv_list.clear(); - if (this->_send_list.empty()) { + if (this->_is_send_complete()) { this->free(nullptr); return 0; } @@ -230,7 +230,7 @@ UDP2ConnectionImpl::startEvent(int event, void *data) break; } - if (this->_is_closed() && this->_send_list.empty()) { + if (this->_is_closed() && this->_is_send_complete()) { this->free(nullptr); } else if (this->_is_closed()) { this->_reenable(&this->write.vio); @@ -254,7 +254,7 @@ UDP2ConnectionImpl::mainEvent(int event, void *data) break; } - if (this->_is_closed() && this->_send_list.empty()) { + if (this->_is_closed() && this->_is_send_complete()) { this->free(nullptr); } else if (this->_is_closed()) { this->_reenable(&this->write.vio); @@ -755,13 +755,19 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) this->callback(NET_EVENT_DATAGRAM_WRITE_READY, this); } - if (this->_is_closed() && this->_send_list.empty()) { + if (this->_is_closed() && this->_is_send_complete()) { this->free(nullptr); } return; } +bool +UDP2ConnectionImpl::_is_send_complete() +{ + return this->_send_list.empty() && this->_external_send_list.empty(); +} + int UDP2ConnectionImpl::_send(UDP2Packet *p) { diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index e9f5b53f748..d41dfaaf23d 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -102,6 +102,7 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation virtual int _sendmsg(UDP2Packet *p); virtual int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); virtual int _readmsg(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); + bool _is_send_complete(); Continuation *_con = nullptr; EThread *_thread = nullptr; From 54cfd50af9d27123c1bdef71db337b1887f73c29 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 18 Feb 2020 08:53:48 +0000 Subject: [PATCH 06/11] remove hack debug msg --- iocore/net/UDPConnection.cc | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 5531a09f219..a41a8a5bb55 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -334,9 +334,9 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b } // If this is a class D address (i.e. multicast address), use REUSEADDR. - // if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { - // goto Lerror; - // } + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { + goto Lerror; + } if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { goto Lerror; @@ -595,11 +595,6 @@ UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callbac } p->chain = chain; - std::string str(p->chain->start(), p->chain->read_avail()); - if (str == "helloword1") { - std::cout << "mother fuck" << std::endl; - } - std::cout << "fuck read: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; // queue onto the UDPConnection this->_recv_list.push_back(std::move(p)); @@ -736,7 +731,6 @@ UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) int rc = this->is_connected() ? this->_send(p) : this->_sendmsg(p); if (rc >= 0) { count++; - std::cout << "fuck send: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; this->_send_list.pop_front(); continue; } From 453074e51f881f4fef8c7fa887bda39762af05b0 Mon Sep 17 00:00:00 2001 From: scw00 Date: Thu, 27 Feb 2020 11:23:18 +0800 Subject: [PATCH 07/11] split bind call for udp binding --- iocore/net/UDPConnection.cc | 49 +++++++++++++++++++++----------- iocore/net/UDPConnection.h | 3 +- iocore/net/test_UDPAcceptEcho.cc | 3 +- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index a41a8a5bb55..2eca4ae595d 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -271,13 +271,11 @@ UDP2ConnectionImpl::start_io() } int -UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_buf) +UDP2ConnectionImpl::create_socket(int family, int recv_buf, int send_buf) { int res = 0; int fd = -1; - IpEndpoint local_addr{}; - int local_addr_len = sizeof(local_addr); - if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) { + if ((res = socketManager.socket(family, SOCK_DGRAM, 0)) < 0) { goto Lerror; } @@ -297,7 +295,7 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b } } - if (addr->sa_family == AF_INET) { + if (family == AF_INET) { bool succeeded = false; int enable = 1; #ifdef IP_PKTINFO @@ -314,7 +312,7 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b Debug("udp_con", "setsockeopt for pktinfo failed"); goto Lerror; } - } else if (addr->sa_family == AF_INET6) { + } else if (family == AF_INET6) { bool succeeded = false; int enable = 1; #ifdef IPV6_PKTINFO @@ -327,6 +325,11 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b succeeded = true; } #endif + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + Debug("udp_con", "safe_setsockopt error IPPROTO_IPV6"); + goto Lerror; + } + if (!succeeded) { Debug("udp_con", "setsockeopt for pktinfo failed"); goto Lerror; @@ -338,35 +341,47 @@ UDP2ConnectionImpl::create_socket(sockaddr const *addr, int recv_buf, int send_b goto Lerror; } - if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int))) < 0) { goto Lerror; } - if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int))) < 0) { - goto Lerror; + this->_fd = fd; + Debug("udp_con", "creating a udp socket family = %d---success", family); + return 0; +Lerror: + Debug("udp_con", "creating a udp socket family = %d---soft failure", family); + if (fd != -1) { + socketManager.close(fd); } - if (-1 == socketManager.ink_bind(fd, addr, ats_ip_size(addr))) { + return -errno; +} + +int +UDP2ConnectionImpl::bind(sockaddr const *addr) +{ + int res = 0; + int local_addr_len = sizeof(this->_from); + if (-1 == socketManager.ink_bind(this->_fd, addr, ats_ip_size(addr))) { char buff[INET6_ADDRPORTSTRLEN]; Debug("udp_con", "ink bind failed on %s %s", ats_ip_nptop(addr, buff, sizeof(buff)), strerror(errno)); goto Lerror; } - if ((res = safe_getsockname(fd, &local_addr.sa, &local_addr_len)) < 0) { + if ((res = safe_getsockname(this->_fd, &this->_from.sa, &local_addr_len)) < 0) { Debug("udp_con", "CreateUdpsocket: getsockname didn't work"); goto Lerror; } - ats_ip_copy(&this->_from, &local_addr.sa); - this->_fd = fd; - Debug("udp_con", "creating a udp socket port = %d---success", ats_ip_port_host_order(local_addr)); + Debug("udp_con", "bind udp socket port = %d---success", ats_ip_port_host_order(addr)); return 0; Lerror: - Debug("udp_con", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr)); - if (fd != -1) { - socketManager.close(fd); + Debug("udp_con", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(addr)); + if (this->_fd != -1) { + socketManager.close(this->_fd); } + this->_fd = -1; return -errno; } diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index d41dfaaf23d..6593675bd9b 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -82,7 +82,8 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation IpEndpoint to() override; void set_continuation(Continuation *con) override; - int create_socket(sockaddr const *addr, int recv_buf = 0, int send_buf = 0); + int create_socket(int family, int recv_buf = 0, int send_buf = 0); + int bind(sockaddr const *addr); int connect(sockaddr const *addr); bool is_connected() const; void bind_thread(EThread *thread); diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc index 839a607461d..ef27b696073 100644 --- a/iocore/net/test_UDPAcceptEcho.cc +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -104,7 +104,8 @@ class AcceptServer : public Continuation addr.sin_port = 0; this->_con = new UDP2ConnectionImpl(this, eventProcessor.assign_thread(ET_UDP2)); - ink_release_assert(this->_con->create_socket(reinterpret_cast(&addr)) >= 0); + ink_release_assert(this->_con->create_socket(AF_INET) >= 0); + ink_release_assert(this->_con->bind(reinterpret_cast(&addr)) >= 0); ink_release_assert(this->_con->start_io() >= 0); ink_release_assert(this->_con != nullptr); std::cout << "bind to port: " << ats_ip_port_host_order(this->_con->from()) << std::endl; From d07c28735a0286a73fb6d36771ed05454a538fd5 Mon Sep 17 00:00:00 2001 From: scw00 Date: Fri, 28 Feb 2020 16:18:29 +0800 Subject: [PATCH 08/11] add set_args api --- iocore/net/UDPConnection.h | 44 +++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index 6593675bd9b..0355d44e430 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -27,27 +27,14 @@ #include "P_Net.h" #include "NetEvent.h" #include "UDPPacket.h" -#include "AtomicEvent.h" #define NET_EVENT_DATAGRAM_CONNECT_SUCCESS (NET_EVENT_EVENTS_START + 170) #define NET_EVENT_DATAGRAM_CONNECT_ERROR (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) #define NET_EVENT_DATAGRAM_WRITE_READY (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) -class UDP2Connection : public NetEvent -{ -public: - virtual ~UDP2Connection() {} - virtual int send(UDP2PacketUPtr p, bool flush = true) = 0; - virtual UDP2PacketUPtr recv() = 0; - virtual void flush() = 0; - - virtual int close() = 0; - virtual void set_continuation(Continuation *con) = 0; - virtual IpEndpoint from() = 0; - virtual IpEndpoint to() = 0; -}; +class Continuation; -class UDP2ConnectionImpl : public UDP2Connection, public Continuation +class UDP2ConnectionImpl : public Continuation, public NetEvent { public: UDP2ConnectionImpl() = delete; @@ -74,13 +61,12 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation ContFlags &get_control_flags() override; int start_io(); - // UDP2Connection - int send(UDP2PacketUPtr packet, bool flush = true) override; - void flush() override; - UDP2PacketUPtr recv() override; - IpEndpoint from() override; - IpEndpoint to() override; - void set_continuation(Continuation *con) override; + int send(UDP2PacketUPtr packet, bool flush = true); + void flush(); + UDP2PacketUPtr recv(); + IpEndpoint from(); + IpEndpoint to(); + void set_continuation(Continuation *con); int create_socket(int family, int recv_buf = 0, int send_buf = 0); int bind(sockaddr const *addr); @@ -91,6 +77,18 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation int startEvent(int event, void *data); int mainEvent(int event, void *data); + void + set_data(void *data) + { + this->_data = data; + } + + void * + get_data() + { + return this->_data; + } + protected: // control max data size per read, This can be calculated as MAX_NIOV * 1024 / read static constexpr int MAX_NIOV = 1; @@ -131,4 +129,6 @@ class UDP2ConnectionImpl : public UDP2Connection, public Continuation std::deque _recv_list; std::deque _send_list; + + void *_data = nullptr; }; From 51bbfb2a6851c3cac995b4e0eb0bcd15ef808346 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 9 Mar 2020 09:41:24 +0800 Subject: [PATCH 09/11] UDP: add default socket --- iocore/net/UDPConnection.cc | 7 ++++++- iocore/net/UDPConnection.h | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 2eca4ae595d..1bb9b8defbb 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -77,7 +77,7 @@ write_reschedule(NetHandler *nh, NetEvent *vc) // // UDP2ConnectionImpl // -UDP2ConnectionImpl::UDP2ConnectionImpl(Continuation *con, EThread *thread) : _con(con), _thread(thread) +UDP2ConnectionImpl::UDP2ConnectionImpl(Continuation *con, EThread *thread, int fd) : _con(con), _thread(thread), _fd(fd) { this->mutex = con->mutex; this->read.enabled = 1; // read enabled is always true because we expected all data; @@ -275,6 +275,11 @@ UDP2ConnectionImpl::create_socket(int family, int recv_buf, int send_buf) { int res = 0; int fd = -1; + + if (this->_fd != -1) { + return 0; + } + if ((res = socketManager.socket(family, SOCK_DGRAM, 0)) < 0) { goto Lerror; } diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h index 0355d44e430..90c0ac1ab84 100644 --- a/iocore/net/UDPConnection.h +++ b/iocore/net/UDPConnection.h @@ -39,7 +39,7 @@ class UDP2ConnectionImpl : public Continuation, public NetEvent public: UDP2ConnectionImpl() = delete; // independent allocate. - UDP2ConnectionImpl(Continuation *con, EThread *ethread = nullptr); + UDP2ConnectionImpl(Continuation *con, EThread *ethread = nullptr, int fd = -1); ~UDP2ConnectionImpl(); enum class UDPEvents : uint8_t { From 2193827e17435996bc08bcddd6022e8b54610604 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 9 Mar 2020 09:54:30 +0800 Subject: [PATCH 10/11] Fix IPV6 bug --- iocore/net/UDPConnection.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 1bb9b8defbb..488244f4ce9 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -330,11 +330,6 @@ UDP2ConnectionImpl::create_socket(int family, int recv_buf, int send_buf) succeeded = true; } #endif - if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { - Debug("udp_con", "safe_setsockopt error IPPROTO_IPV6"); - goto Lerror; - } - if (!succeeded) { Debug("udp_con", "setsockeopt for pktinfo failed"); goto Lerror; @@ -367,6 +362,12 @@ UDP2ConnectionImpl::bind(sockaddr const *addr) { int res = 0; int local_addr_len = sizeof(this->_from); + + if (addr->sa_family == AF_INET6 && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + Debug("udp_con", "safe_setsockopt error IPPROTO_IPV6"); + goto Lerror; + } + if (-1 == socketManager.ink_bind(this->_fd, addr, ats_ip_size(addr))) { char buff[INET6_ADDRPORTSTRLEN]; Debug("udp_con", "ink bind failed on %s %s", ats_ip_nptop(addr, buff, sizeof(buff)), strerror(errno)); From d4110e5254a3c11373cf78d50685cec6ec1cde98 Mon Sep 17 00:00:00 2001 From: scw00 Date: Mon, 9 Mar 2020 09:58:38 +0800 Subject: [PATCH 11/11] fix compile error --- iocore/net/UDPConnection.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc index 488244f4ce9..d197defeeb1 100644 --- a/iocore/net/UDPConnection.cc +++ b/iocore/net/UDPConnection.cc @@ -363,7 +363,7 @@ UDP2ConnectionImpl::bind(sockaddr const *addr) int res = 0; int local_addr_len = sizeof(this->_from); - if (addr->sa_family == AF_INET6 && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + if (addr->sa_family == AF_INET6 && (res = safe_setsockopt(this->_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { Debug("udp_con", "safe_setsockopt error IPPROTO_IPV6"); goto Lerror; }