From 050152eb90bc6bed54c28817d156ff8cdd085638 Mon Sep 17 00:00:00 2001 From: sfan5 Date: Fri, 5 Jan 2024 13:54:50 +0100 Subject: [PATCH] Do not allocate packet quota to half-open connections --- src/network/connection.cpp | 15 ++++++++++++ src/network/connection.h | 2 ++ src/network/connectionthreads.cpp | 38 ++++++++++++++++--------------- src/network/connectionthreads.h | 4 ++-- 4 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/network/connection.cpp b/src/network/connection.cpp index e3d8d3999..8c9f1cbac 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -1397,6 +1397,21 @@ session_t Connection::lookupPeer(const Address& sender) return PEER_ID_INEXISTENT; } +u32 Connection::getActiveCount() +{ + MutexAutoLock peerlock(m_peers_mutex); + u32 count = 0; + for (auto &it : m_peers) { + Peer *peer = it.second; + if (peer->isPendingDeletion()) + continue; + if (peer->isHalfOpen()) + continue; + count++; + } + return count; +} + bool Connection::deletePeer(session_t peer_id, bool timeout) { Peer *peer = 0; diff --git a/src/network/connection.h b/src/network/connection.h index a14ebb9e9..5dc97441a 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -764,6 +764,8 @@ protected: return m_peer_ids; } + u32 getActiveCount(); + UDPSocket m_udpSocket; // Command queue: user -> SendThread MutexedQueue m_command_queue; diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 970b7d1ed..d99169160 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -86,8 +86,6 @@ void *ConnectionSendThread::run() BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); - m_iteration_packets_avaialble = m_max_data_packets_per_iteration; - /* wait for trigger or timeout */ m_send_sleep_semaphore.wait(50); @@ -99,8 +97,16 @@ void *ConnectionSendThread::run() curtime = porting::getTimeMs(); float dtime = CALC_DTIME(lasttime, curtime); + m_iteration_packets_avaialble = m_max_data_packets_per_iteration; + const auto &calculate_quota = [&] () -> u32 { + u32 numpeers = m_connection->getActiveCount(); + if (numpeers > 0) + return MYMAX(1, m_iteration_packets_avaialble / numpeers); + return m_iteration_packets_avaialble; + }; + /* first resend timed-out packets */ - runTimeouts(dtime); + runTimeouts(dtime, calculate_quota()); if (m_iteration_packets_avaialble == 0) { LOG(warningstream << m_connection->getDesc() << " Packet quota used up after re-sending packets, " @@ -119,7 +125,7 @@ void *ConnectionSendThread::run() } /* send queued packets */ - sendPackets(dtime); + sendPackets(dtime, calculate_quota()); END_DEBUG_EXCEPTION_HANDLER } @@ -160,17 +166,12 @@ bool ConnectionSendThread::packetsQueued() return false; } -void ConnectionSendThread::runTimeouts(float dtime) +void ConnectionSendThread::runTimeouts(float dtime, u32 peer_packet_quota) { std::vector timeouted_peers; std::vector peerIds = m_connection->getPeerIDs(); - const u32 numpeers = m_connection->m_peers.size(); - - if (numpeers == 0) - return; - - for (session_t &peerId : peerIds) { + for (const session_t peerId : peerIds) { PeerHelper peer = m_connection->getPeerNoEx(peerId); if (!peer) @@ -217,8 +218,8 @@ void ConnectionSendThread::runTimeouts(float dtime) channel.outgoing_reliables_sent.incrementTimeouts(dtime); // Re-send timed out outgoing reliables - auto timed_outs = channel.outgoing_reliables_sent.getResend(resend_timeout, - (m_max_data_packets_per_iteration / numpeers)); + auto timed_outs = channel.outgoing_reliables_sent.getResend( + resend_timeout, peer_packet_quota); channel.UpdatePacketLossCounter(timed_outs.size()); g_profiler->graphAdd("packets_lost", timed_outs.size()); @@ -235,7 +236,11 @@ void ConnectionSendThread::runTimeouts(float dtime) continue; } - m_iteration_packets_avaialble -= timed_outs.size(); + if (m_iteration_packets_avaialble > timed_outs.size()) + m_iteration_packets_avaialble -= timed_outs.size(); + else + m_iteration_packets_avaialble = 0; + for (const auto &k : timed_outs) resendReliable(channel, k.get(), resend_timeout); @@ -643,15 +648,12 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c) } } -void ConnectionSendThread::sendPackets(float dtime) +void ConnectionSendThread::sendPackets(float dtime, u32 peer_packet_quota) { std::vector peerIds = m_connection->getPeerIDs(); std::vector pendingDisconnect; std::map pending_unreliable; - const unsigned int peer_packet_quota = m_iteration_packets_avaialble - / MYMAX(peerIds.size(), 1); - for (session_t peerId : peerIds) { PeerHelper peer = m_connection->getPeerNoEx(peerId); //peer may have been removed diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h index d8e083351..3ba45b3b8 100644 --- a/src/network/connectionthreads.h +++ b/src/network/connectionthreads.h @@ -69,7 +69,7 @@ public: void setPeerTimeout(float peer_timeout) { m_timeout = peer_timeout; } private: - void runTimeouts(float dtime); + void runTimeouts(float dtime, u32 peer_packet_quota); void resendReliable(Channel &channel, const BufferedPacket *k, float resend_timeout); void rawSend(const BufferedPacket *p); bool rawSendAsPacket(session_t peer_id, u8 channelnum, @@ -86,7 +86,7 @@ private: void sendToAll(u8 channelnum, const SharedBuffer &data); void sendToAllReliable(ConnectionCommandPtr &c); - void sendPackets(float dtime); + void sendPackets(float dtime, u32 peer_packet_quota); void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer &data, bool ack = false);