-
Notifications
You must be signed in to change notification settings - Fork 99
MINIFICPP-2701 ListenUDP udp.sender.port property #2084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- added tryDequeue overload with optional return type to MinifiConcurrentQueue - renamed port / server_port to remote and local - added remote port to Message, adapted all the dependent files - added udp.sender.port output attribute to ListenUDP - extended test utility to capture the local endpoint after UDP send - extended ListenUDPTests to test that the new property is set appropriately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for capturing and exposing the remote (sender) port in UDP messages by extending the ListenUDP processor with a new udp.sender.port output attribute. The changes involve refactoring the Message struct to track both local and remote endpoint information (address and port) and updating all consumers of this struct throughout the codebase.
Key changes include:
- Added
remote_portfield toutils::net::Messagestruct and renamed fields for clarity (sender_address→remote_address,server_port→local_port) - Introduced new
udp.sender.portoutput attribute to ListenUDP processor to expose the sending port - Added
tryDequeue()overload toMinifiConcurrentQueuethat returnsstd::optional<T>for cleaner API usage
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| extension-framework/include/utils/net/Message.h | Refactored Message struct to include both remote and local port information with clearer naming |
| core-framework/include/utils/MinifiConcurrentQueue.h | Added tryDequeue() overload returning std::optional |
| extension-framework/include/utils/net/Server.h | Updated tryDequeue signature to return optional and added deleted copy/move constructors |
| extension-framework/include/utils/net/TcpServer.h | Updated session method signatures to accept remote_port parameter |
| extension-framework/src/utils/net/TcpServer.cpp | Updated to extract and pass remote port through session handlers |
| extension-framework/src/utils/net/UdpServer.cpp | Updated Message construction to include sender port |
| extensions/standard-processors/processors/ListenUDP.h | Added SenderPort output attribute definition |
| extensions/standard-processors/processors/ListenUDP.cpp | Updated to set all three attributes including new udp.sender.port |
| extensions/standard-processors/processors/ListenTCP.cpp | Updated to use renamed Message fields |
| extensions/standard-processors/processors/ListenSyslog.cpp | Updated to use renamed Message fields |
| extensions/standard-processors/processors/GetTCP.h | Updated TcpClient::tryDequeue to return optional; added deleted copy/move constructors |
| extensions/standard-processors/processors/GetTCP.cpp | Updated to use optional-returning tryDequeue; contains critical bug at line 208 |
| extensions/standard-processors/processors/NetworkListenerProcessor.h | Added deleted copy/move constructors for safety |
| extensions/standard-processors/processors/NetworkListenerProcessor.cpp | Updated to use optional-returning tryDequeue |
| libminifi/test/libtest/unit/TestUtils.h | Added UdpNetworkSendResult struct to capture local endpoint after send |
| libminifi/test/libtest/unit/TestUtils.cpp | Updated sendUdpDatagram to return result struct with local endpoint |
| extensions/standard-processors/tests/unit/ListenUDPTests.cpp | Extended tests to verify udp.sender.port attribute is set correctly |
| extensions/standard-processors/tests/unit/PutUDPTests.cpp | Updated to use new optional-returning tryDequeue and UdpNetworkSendResult |
| extensions/standard-processors/tests/unit/PutTCPTests.cpp | Updated to use optional-returning tryDequeue and pass remote_port to sessions |
| extensions/standard-processors/tests/unit/ListenSyslogTests.cpp | Updated to access .ec field from UdpNetworkSendResult |
| PROCESSORS.md | Updated ListenUDP documentation to include new udp.sender.port attribute |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The local_endpoint is incorrectly assigned to remote_endpoint(). It should be assigned to local_endpoint() to correctly capture the local port information. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| public: | ||
| Message() = default; | ||
| Message(std::string message_data, IpProtocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port) | ||
| Message() = delete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old default constructor left a few members uninitialized. It used to be needed because objects were assigned from an "empty" state through out params. Refactored those instances, so that no more default constructed "empty" messages are necessary.
| Server(const Server&) = delete; | ||
| Server(Server&&) = delete; | ||
| Server& operator=(const Server&) = delete; | ||
| Server& operator=(Server&&) = delete; | ||
| virtual ~Server() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rule of 5 (same with TcpClient and NetworkListenerProcessor)
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#rc-five
|
|
||
| bool queueEmpty() const; | ||
| bool tryDequeue(utils::net::Message& received_message); | ||
| std::optional<utils::net::Message> tryDequeue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional return values are cleaner than status bool return + conditionally assigned out param, plus it allows us to remove the default empty state from the possible states of the class, simplifying code.
|
|
||
| struct UdpNetworkSendResult { | ||
| std::error_code ec; | ||
| std::optional<asio::ip::udp::endpoint> local_endpoint = std::nullopt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning the local endpoint after a send from an unbound socket lets us check that the proper port is set on the receiving ListenUDP end: the local endpoint of the sending socket is the remote/sender endpoint of the listener socket, and before/during the send, the socket is automatically bound to a random port by the OS, just like with TCP connects.
lordgamez
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, all around good improvements 👍
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.