Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions modules/test/conn/bin/get_packet_counts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash

# Check if MAC address and pcap file arguments are provided
if [ -z "$1" ] || [ -z "$2" ]; then
echo "Usage: $0 <MAC_ADDRESS> <PCAP_FILE>"
exit 1
fi

# Assign MAC address and pcap file from arguments
PCAP_FILE="$1"
MAC_ADDRESS="$2"

# Check if the pcap file exists
if [ ! -f "$PCAP_FILE" ]; then
echo "Error: File $PCAP_FILE does not exist."
exit 1
fi

# Count multicast packets from the MAC address
multicast_from_count=$(tshark -r "$PCAP_FILE" -Y "(eth.dst[0] == 1) && eth.src == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Count multicast packets to the MAC address
multicast_to_count=$(tshark -r "$PCAP_FILE" -Y "(eth.dst[0] == 1) && eth.dst == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Count broadcast packets from the MAC address (broadcast MAC address is FF:FF:FF:FF:FF:FF)
broadcast_from_count=$(tshark -r "$PCAP_FILE" -Y "eth.dst == ff:ff:ff:ff:ff:ff && eth.src == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Count broadcast packets to the MAC address
broadcast_to_count=$(tshark -r "$PCAP_FILE" -Y "eth.dst == ff:ff:ff:ff:ff:ff && eth.dst == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Count unicast packets from the MAC address
unicast_from_count=$(tshark -r "$PCAP_FILE" -Y "eth.dst != ff:ff:ff:ff:ff:ff && (eth.dst[0] & 1) == 0 && eth.src == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Count unicast packets to the MAC address
unicast_to_count=$(tshark -r "$PCAP_FILE" -Y "eth.dst != ff:ff:ff:ff:ff:ff && (eth.dst[0] & 1) == 0 && eth.dst == $MAC_ADDRESS" -T fields -e frame.number | wc -l)

# Output the results as a JSON object
echo "{
\"mac_address\": \"$MAC_ADDRESS\",
\"multicast\": {
\"from\": $multicast_from_count,
\"to\": $multicast_to_count
},
\"broadcast\": {
\"from\": $broadcast_from_count,
\"to\": $broadcast_to_count
},
\"unicast\": {
\"from\": $unicast_from_count,
\"to\": $unicast_to_count
}
}"
5 changes: 5 additions & 0 deletions modules/test/conn/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
"Enable ping response to IPv6 ICMP requests in network manager settings",
"Create a firewall exception to allow ICMPv6 via LAN"
]
},
{
"name": "communication.network.type",
"test_description": "How does the devie communicate (flow type) - Unicast, multicast broadcast?",
"expected_behavior": "Informational - One or more of these flow types are used"
}
]
}
Expand Down
8 changes: 7 additions & 1 deletion modules/test/conn/conn.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ARG GRPC_PROTO_DIR=/testrun/python/src/grpc/proto/dhcp
ARG GRPC_PROTO_FILE="grpc.proto"

# Install all necessary packages
RUN apt-get install -y wget
RUN apt-get install -y wget tshark

# Load the requirements file
COPY $MODULE_DIR/python/requirements.txt /testrun/python
Expand All @@ -35,5 +35,11 @@ COPY $MODULE_DIR/conf /testrun/conf
# Copy over all binary files
COPY $MODULE_DIR/bin /testrun/bin

# Remove incorrect line endings
RUN dos2unix /testrun/bin/*

# Make sure all the bin files are executable
RUN chmod u+x /testrun/bin/*

# Copy over all python files
COPY $MODULE_DIR/python /testrun/python
87 changes: 77 additions & 10 deletions modules/test/conn/python/src/connection_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
from host.client import Client as HostClient
from dhcp_util import DHCPUtil
from port_stats_util import PortStatsUtil
import json

LOG_NAME = 'test_connection'
OUI_FILE = '/usr/local/etc/oui.txt'
DEFAULT_BIN_DIR = '/testrun/bin'
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
DHCP_CAPTURE_FILE = '/runtime/network/dhcp-1.pcap'
Expand All @@ -47,7 +49,8 @@ def __init__(self,
conf_file=None,
results_dir=None,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE):
monitor_capture_file=MONITOR_CAPTURE_FILE,
bin_dir=DEFAULT_BIN_DIR):

super().__init__(module_name=module,
log_name=LOG_NAME,
Expand All @@ -63,6 +66,7 @@ def __init__(self,
self.host_client = HostClient()
self._dhcp_util = DHCPUtil(self.dhcp1_client, self.dhcp2_client, LOGGER)
self._lease_wait_time_sec = LEASE_WAIT_TIME_DEFAULT
self._bin_dir = bin_dir

# ToDo: Move this into some level of testing, leave for
# reference until tests are implemented with these calls
Expand Down Expand Up @@ -204,9 +208,9 @@ def _connection_dhcp_address(self):
return False, 'No IP information found in lease: ' + self._device_mac
else:
LOGGER.info('No DHCP lease could be found for MAC ' + self._device_mac +
' at the time of this test')
' at the time of this test')
return (False, 'No DHCP lease could be found for MAC ' +
self._device_mac + ' at the time of this test')
self._device_mac + ' at the time of this test')

def _connection_mac_address(self):
LOGGER.info('Running connection.mac_address')
Expand Down Expand Up @@ -325,9 +329,9 @@ def _connection_ipaddr_ip_change(self, config):
result = None, 'Failed to create reserved lease for device'
else:
LOGGER.info('Device has no current DHCP lease so ' +
'this test could not be run')
'this test could not be run')
result = None, ('Device has no current DHCP lease so ' +
'this test could not be run')
'this test could not be run')
# Restore the network
self._dhcp_util.restore_failover_dhcp_server()
LOGGER.info('Waiting 30 seconds for reserved lease to expire')
Expand Down Expand Up @@ -380,8 +384,9 @@ def _connection_ipaddr_dhcp_failover(self, config):
else:
result = False, 'Device did not respond to ping'
else:
result = (None,
'Device has no current DHCP lease so this test could not be run')
result = (
None,
'Device has no current DHCP lease so this test could not be run')
else:
LOGGER.error('Network is not ready for this test. Skipping')
result = None, 'Network is not ready for this test'
Expand Down Expand Up @@ -673,6 +678,67 @@ def setup_single_dhcp_server(self):
else:
return False, 'Secondary DHCP server stop command failed'

def _communication_network_type(self):
try:
result = 'Informational'
description = ''
details = ''
packets = self.get_network_packet_types()
details = packets
# Initialize a list for detected packet types
packet_types = []

# Check for the presence of each packet type and append to the list
if (packets['multicast']['from'] > 0) or (packets['multicast']['to'] > 0):
packet_types.append('Multicast')
if (packets['broadcast']['from'] > 0) or (packets['broadcast']['to'] > 0):
packet_types.append('Broadcast')
if (packets['unicast']['from'] > 0) or (packets['unicast']['to'] > 0):
packet_types.append('Unicast')

# Construct the description if any packet types were detected
if packet_types:
description = 'Packet types detected: ' + ', '.join(packet_types)
else:
description = 'No multicast, broadcast or unicast detected'

except Exception as e: # pylint: disable=W0718
LOGGER.error(e)
result = 'Error'
return result, description, details

def get_network_packet_types(self):
combined_results = {
'mac_address': self._device_mac,
'multicast': {
'from': 0,
'to': 0
},
'broadcast': {
'from': 0,
'to': 0
},
'unicast': {
'from': 0,
'to': 0
},
}
capture_files = [self.startup_capture_file, self.monitor_capture_file]
for capture_file in capture_files:
bin_file = self._bin_dir + '/get_packet_counts.sh'
args = f'"{capture_file}" "{self._device_mac}"'
command = f'{bin_file} {args}'
response = util.run_command(command)
packets = json.loads(response[0].strip())
# Combine results
combined_results['multicast']['from'] += packets['multicast']['from']
combined_results['multicast']['to'] += packets['multicast']['to']
combined_results['broadcast']['from'] += packets['broadcast']['from']
combined_results['broadcast']['to'] += packets['broadcast']['to']
combined_results['unicast']['from'] += packets['unicast']['from']
combined_results['unicast']['to'] += packets['unicast']['to']
return combined_results

def enable_failover(self):
# Move primary DHCP server to primary failover
LOGGER.info('Configuring primary failover DHCP server')
Expand Down Expand Up @@ -734,9 +800,10 @@ def _run_subnet_test(self, config):
results = self.test_subnets(ranges)
else:
LOGGER.info('Device has no current DHCP lease ' +
'so this test could not be run')
return (None,
'Device has no current DHCP lease so this test could not be run')
'so this test could not be run')
return (
None,
'Device has no current DHCP lease so this test could not be run')
else:
LOGGER.error(dhcp_setup[1])
return None, 'Failed to setup DHCP server for test'
Expand Down
41 changes: 37 additions & 4 deletions testing/unit/conn/conn_module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

LOGGER = None


class ConnectionModuleTest(unittest.TestCase):
"""Contains and runs all the unit tests concerning Connection
module behaviors"""
Expand Down Expand Up @@ -133,13 +134,42 @@ def connection_port_speed_autonegotiation_fail_test(self):
def connection_switch_dhcp_snooping_icmp_test(self):
LOGGER.info('connection_switch_dhcp_snooping_icmp_test')
conn_module = ConnectionModule(module=MODULE,
results_dir=OUTPUT_DIR,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE)
result = conn_module._connection_switch_dhcp_snooping() # pylint: disable=W0212
results_dir=OUTPUT_DIR,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE)
result = conn_module._connection_switch_dhcp_snooping() # pylint: disable=W0212
LOGGER.info(result)
self.assertEqual(result[0], True)

def communication_network_type_test(self):
LOGGER.info('communication_network_type_test')
conn_module = ConnectionModule(module=MODULE,
results_dir=OUTPUT_DIR,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE)
result = conn_module._communication_network_type() # pylint: disable=W0212
details_expected = {
'mac_address': '98:f0:7b:d1:87:06',
'multicast': {
'from': 11,
'to': 0
},
'broadcast': {
'from': 13,
'to': 0
},
'unicast': {
'from': 0,
'to': 0
}
}
LOGGER.info(result)
self.assertEqual(result[0], 'Informational')
self.assertEqual(result[1], 'Packet types detected: Multicast, Broadcast')
self.assertEqual(result[2], details_expected)
#self.assertEqual(result[0], True)


if __name__ == '__main__':
suite = unittest.TestSuite()

Expand All @@ -162,6 +192,9 @@ def connection_switch_dhcp_snooping_icmp_test(self):
suite.addTest(
ConnectionModuleTest('connection_switch_dhcp_snooping_icmp_test'))

# DHCP Snooping related tests
suite.addTest(ConnectionModuleTest('communication_network_type_test'))

runner = unittest.TextTestRunner()
test_result = runner.run(suite)

Expand Down