Skip to content

Start working on some updates to NUClearNet to make it more testable #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
10 changes: 9 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ configure_file(nuclear.in ${PROJECT_BINARY_DIR}/nuclear)

# Build the library
find_package(Threads REQUIRED)
file(GLOB_RECURSE src "*.c" "*.cpp" "*.hpp" "*.ipp")
file(
GLOB_RECURSE
src
CONFIGURE_DEPENDS
"*.c"
"*.cpp"
"*.hpp"
"*.ipp"
)
add_library(nuclear STATIC ${src})
add_library(NUClear::nuclear ALIAS nuclear)

Expand Down
52 changes: 18 additions & 34 deletions src/extension/network/NUClearNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -501,7 +500,7 @@ namespace extension {
if (ptr) {

auto now = std::chrono::steady_clock::now();
auto timeout = it->last_send + ptr->round_trip_time;
auto timeout = it->last_send + ptr->rtt.timeout();

// Check if we should have expected an ack by now for some packets
if (timeout < now) {
Expand All @@ -510,7 +509,7 @@ namespace extension {
it->last_send = now;

// The next time we should check for a timeout
auto next_timeout = now + ptr->round_trip_time;
auto next_timeout = now + ptr->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down Expand Up @@ -673,18 +672,11 @@ namespace extension {
remote->last_update = std::chrono::steady_clock::now();

// Check if this packet is a retransmission of data
if (header.type == DATA_RETRANSMISSION) {
if (header.type == DATA_RETRANSMISSION
&& remote->deduplicator.is_duplicate(packet.packet_id)) {

// See if we recently processed this packet
// NOLINTNEXTLINE(readability-qualified-auto) MSVC disagrees
auto it = std::find(remote->recent_packets.begin(),
remote->recent_packets.end(),
packet.packet_id);

// We recently processed this packet, this is just a failed ack
// Send the ack again if it was reliable
if (it != remote->recent_packets.end() && packet.reliable) {

if (packet.reliable) {
// Allocate room for the whole ack packet
std::vector<uint8_t> r(sizeof(ACKPacket) + (packet.packet_count / 8), 0);
ACKPacket& response = *reinterpret_cast<ACKPacket*>(r.data());
Expand All @@ -708,10 +700,10 @@ namespace extension {
0,
&to.sock,
to.size());

// We don't need to process this packet we already did
return;
}

// We don't need to process this packet we already did
return;
}

// If this is a solo packet (in a single chunk)
Expand Down Expand Up @@ -739,13 +731,11 @@ namespace extension {
0,
&to.sock,
to.size());

// Set this packet to have been recently received
remote->recent_packets[remote->recent_packets_index
.fetch_add(1, std::memory_order_relaxed)] =
packet.packet_id;
}

// Add the packet to our deduplicator
remote->deduplicator.add_packet(packet.packet_id);

packet_callback(*remote, packet.hash, packet.reliable, std::move(out));
}
else {
Expand Down Expand Up @@ -851,25 +841,20 @@ namespace extension {
&part.data + p.second.size() - sizeof(DataPacket) + 1);
}

// Add the packet to our deduplicator
remote->deduplicator.add_packet(packet.packet_id);

// Send our assembled data packet
packet_callback(*remote, packet.hash, packet.reliable, std::move(out));

// If the packet was reliable add that it was recently received
if (packet.reliable) {
// Set this packet to have been recently received
remote->recent_packets[remote->recent_packets_index
.fetch_add(1, std::memory_order_relaxed)] =
packet.packet_id;
}

// We have completed this packet, discard the data
assemblers.erase(assemblers.find(packet.packet_id));
}

// Check for and delete any timed out packets
for (auto it = assemblers.begin(); it != assemblers.end();) {
const auto now = std::chrono::steady_clock::now();
const auto timeout = remote->round_trip_time * 10.0;
const auto timeout = remote->rtt.timeout() * 10.0;
const auto& last_chunk_time = it->second.first;

it = now > last_chunk_time + timeout ? assemblers.erase(it) : std::next(it);
Expand Down Expand Up @@ -919,8 +904,7 @@ namespace extension {

// Approximate how long the round trip is to this remote so we can work out how
// long before retransmitting
// We use a baby kalman filter to help smooth out jitter
remote->measure_round_trip(round_trip);
remote->rtt.measure(round_trip);

// Update our acks
bool all_acked = true;
Expand Down Expand Up @@ -987,7 +971,7 @@ namespace extension {
s->last_send = std::chrono::steady_clock::now();

// The next time we should check for a timeout
auto next_timeout = s->last_send + remote->round_trip_time;
auto next_timeout = s->last_send + remote->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down Expand Up @@ -1108,7 +1092,7 @@ namespace extension {
queue.targets.emplace_back(it->second, acks);

// The next time we should check for a timeout
auto next_timeout = std::chrono::steady_clock::now() + it->second->round_trip_time;
auto next_timeout = std::chrono::steady_clock::now() + it->second->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down
50 changes: 7 additions & 43 deletions src/extension/network/NUClearNetwork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

#include "../../util/network/sock_t.hpp"
#include "../../util/platform.hpp"
#include "PacketDeduplicator.hpp"
#include "RTTEstimator.hpp"
#include "wire_protocol.hpp"

namespace NUClear {
Expand All @@ -56,64 +58,26 @@ namespace extension {
std::string name,
const sock_t& target,
const std::chrono::steady_clock::time_point& last_update = std::chrono::steady_clock::now())
: name(std::move(name)), target(target), last_update(last_update) {

// Set our recent packets to an invalid value
recent_packets.fill(-1);
}
: name(std::move(name)), target(target), last_update(last_update) {}

/// The name of the remote target
std::string name;
/// The socket address for the remote target
sock_t target{};
/// When we last received data from the remote target
std::chrono::steady_clock::time_point last_update;
/// A list of the last n packet groups to be received
std::array<int, std::numeric_limits<uint8_t>::max()> recent_packets{};
/// An index for the recent_packets (circular buffer)
std::atomic<uint8_t> recent_packets_index{0};
/// Mutex to protect the fragmented packet storage
std::mutex assemblers_mutex;
/// Storage for fragmented packets while we build them
std::map<uint16_t,
std::pair<std::chrono::steady_clock::time_point, std::map<uint16_t, std::vector<uint8_t>>>>
assemblers;

/// Struct storing the kalman filter for round trip time
struct RoundTripKF {
float process_noise = 1e-6f;
float measurement_noise = 1e-1f;
float variance = 1.0f;
float mean = 1.0f;
};
/// A little kalman filter for estimating round trip time
RoundTripKF round_trip_kf{};

std::chrono::steady_clock::duration round_trip_time{std::chrono::seconds(1)};

void measure_round_trip(std::chrono::steady_clock::duration time) {

// Make our measurement into a float seconds type
const std::chrono::duration<float> m =
std::chrono::duration_cast<std::chrono::duration<float>>(time);

// Alias variables
const auto& Q = round_trip_kf.process_noise;
const auto& R = round_trip_kf.measurement_noise;
auto& P = round_trip_kf.variance;
auto& X = round_trip_kf.mean;

// Calculate our kalman gain
const float K = (P + Q) / (P + Q + R);

// Do filter
P = R * (P + Q) / (R + P + Q);
X = X + (m.count() - X) * K;
/// RTT estimator for this network target
RTTEstimator rtt;

// Put result into our variable
round_trip_time = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<float>(X));
}
/// Packet deduplicator for this network target
PacketDeduplicator deduplicator;
};

NUClearNetwork() = default;
Expand Down
76 changes: 76 additions & 0 deletions src/extension/network/PacketDeduplicator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* MIT License
*
* Copyright (c) 2025 NUClear Contributors
*
* This file is part of the NUClear codebase.
* See https://github.com/Fastcode/NUClear for further info.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "PacketDeduplicator.hpp"

#include <cstdint>

namespace NUClear {
namespace extension {
namespace network {


bool PacketDeduplicator::is_duplicate(uint16_t packet_id) const {
// If we haven't seen any packets yet, nothing is a duplicate
if (!initialized) {
return false;
}

// Calculate relative position in window using unsigned subtraction
const uint16_t relative_id = newest_seen - packet_id;

// If the packet is too old or too new, it's not a duplicate
if (relative_id >= 256) {
return false;
}

return window[relative_id];
}

void PacketDeduplicator::add_packet(uint16_t packet_id) {
// If this is our first packet, just set it as newest_seen
if (!initialized) {
newest_seen = packet_id;
window[0] = true;
initialized = true;
return;
}

// Calculate relative position in window using unsigned subtraction
const uint16_t relative_id = newest_seen - packet_id;

// If the distance is more than half the range, the packet is newer than our newest_seen
if (relative_id > 32768) {
// Calculate how far to shift to make this packet our newest
const uint16_t shift_amount = packet_id - newest_seen;
window <<= shift_amount;
newest_seen = packet_id;
window[0] = true;
}
// Packet is recent enough to be counted
else if (relative_id < 256) {
window[relative_id] = true;
}
}

} // namespace network
} // namespace extension
} // namespace NUClear
67 changes: 67 additions & 0 deletions src/extension/network/PacketDeduplicator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* MIT License
*
* Copyright (c) 2025 NUClear Contributors
*
* This file is part of the NUClear codebase.
* See https://github.com/Fastcode/NUClear for further info.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
#define NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP

#include <bitset>
#include <cstdint>

namespace NUClear {
namespace extension {
namespace network {

/**
* A class that implements a sliding window bitset for packet deduplication.
* Maintains a 256-bit window of recently seen packet IDs, sliding forward as new packets are added.
*/
class PacketDeduplicator {
public:
/**
* Check if a packet ID has been seen recently
*
* @param packet_id The packet ID to check
*
* @return true if the packet has been seen recently, false otherwise
*/
bool is_duplicate(uint16_t packet_id) const;

/**
* Add a packet ID to the window
*
* @param packet_id The packet ID to add
*/
void add_packet(uint16_t packet_id);

private:
/// Whether we've seen any packets yet
bool initialized{false};
/// The newest packet ID we've seen
uint16_t newest_seen{0};
/// The 256-bit window of seen packets (newest at 0, older at higher indices)
std::bitset<256> window;
};

} // namespace network
} // namespace extension
} // namespace NUClear

#endif // NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
Loading
Loading