diff --git a/src/network/connection.cpp b/src/network/connection.cpp index dada76f83..5bc22db74 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -194,7 +194,7 @@ u32 ReliablePacketBuffer::size() return m_list.size(); } -RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum) +ReliablePacketBuffer::FindResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum) { for (auto it = m_list.begin(); it != m_list.end(); ++it) { if ((*it)->getSeqnum() == seqnum) @@ -232,7 +232,7 @@ BufferedPacketPtr ReliablePacketBuffer::popFirst() BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum) { MutexAutoLock listlock(m_list_mutex); - RPBSearchResult r = findPacketNoLock(seqnum); + auto r = findPacketNoLock(seqnum); if (r == m_list.end()) { LOG(dout_con<<"Sequence number: " << seqnum << " not found in reliable buffer"<getSeqnum(), i->size(), i->address.serializeString().c_str()); - fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n", + warningstream << buf; + snprintf(buf, sizeof(buf), + "New: seqnum: %05d size: %04zu, address: %s\n", p.getSeqnum(), p.size(), p.address.serializeString().c_str()); + warningstream << buf << std::flush; throw IncomingDataCorruption("duplicated packet isn't same as original one"); } } @@ -357,11 +363,11 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) } } -std::list> +std::vector> ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets) { MutexAutoLock listlock(m_list_mutex); - std::list> timed_outs; + std::vector> timed_outs; for (auto &packet : m_list) { // resend time scales exponentially with each cycle const float pkt_timeout = timeout * powf(RESEND_SCALE_BASE, packet->resend_count); @@ -504,9 +510,9 @@ SharedBuffer IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reli void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { + MutexAutoLock listlock(m_map_mutex); std::vector remove_queue; { - MutexAutoLock listlock(m_map_mutex); for (const auto &i : m_buf) { IncomingSplitPacket *p = i.second; // Reliable ones are not removed by timeout @@ -518,10 +524,10 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) } } for (u16 j : remove_queue) { - MutexAutoLock listlock(m_map_mutex); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<second; + m_buf.erase(it); } } @@ -967,7 +973,7 @@ void Peer::Drop() delete this; } -UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : +UDPPeer::UDPPeer(session_t a_id, Address a_address, Connection* connection) : Peer(a_address,a_id,connection) { for (Channel &channel : channels) @@ -1134,8 +1140,6 @@ bool UDPPeer::processReliableSendCommand( FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); } - // DO NOT REMOVE n_queued! It avoids a deadlock of async locked - // 'log_message_mutex' and 'm_list_mutex'. u32 n_queued = chan.outgoing_reliables_sent.size(); LOG(dout_con<getDesc() @@ -1339,7 +1343,7 @@ PeerHelper Connection::getPeerNoEx(session_t peer_id) } /* find peer_id for address */ -u16 Connection::lookupPeer(Address& sender) +session_t Connection::lookupPeer(const Address& sender) { MutexAutoLock peerlock(m_peers_mutex); std::map::iterator j; @@ -1559,7 +1563,7 @@ float Connection::getLocalStat(rate_stat_type type) return retval; } -u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) +session_t Connection::createPeer(Address& sender, MTProtocols protocol, int fd) { // Somebody wants to make a new connection diff --git a/src/network/connection.h b/src/network/connection.h index edecb1df3..6704cecdd 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -250,8 +250,6 @@ private: for fast access to the smallest one. */ -typedef std::list::iterator RPBSearchResult; - class ReliablePacketBuffer { public: @@ -264,7 +262,7 @@ public: void insert(BufferedPacketPtr &p_ptr, u16 next_expected); void incrementTimeouts(float dtime); - std::list> getTimedOuts(float timeout, u32 max_packets); + std::vector> getTimedOuts(float timeout, u32 max_packets); void print(); bool empty(); @@ -272,7 +270,9 @@ public: private: - RPBSearchResult findPacketNoLock(u16 seqnum); + typedef std::list::iterator FindResult; + + FindResult findPacketNoLock(u16 seqnum); std::list m_list; @@ -743,9 +743,9 @@ public: protected: PeerHelper getPeerNoEx(session_t peer_id); - u16 lookupPeer(Address& sender); + session_t lookupPeer(const Address& sender); - u16 createPeer(Address& sender, MTProtocols protocol, int fd); + session_t createPeer(Address& sender, MTProtocols protocol, int fd); UDPPeer* createServerPeer(Address& sender); bool deletePeer(session_t peer_id, bool timeout); diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 96a89b1b5..b5128a893 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -44,8 +44,6 @@ namespace con // TODO: Clean this up. #define LOG(a) a -#define WINDOW_SIZE 5 - static inline session_t readPeerId(const u8 *packetdata) { return readU16(&packetdata[4]); @@ -519,9 +517,9 @@ void ConnectionSendThread::serve(Address bind_address) void ConnectionSendThread::connect(Address address) { - LOG(dout_con << m_connection->getDesc() << " connecting to " - << address.serializeString() - << ":" << address.getPort() << std::endl); + dout_con << m_connection->getDesc() << " connecting to "; + address.print(dout_con); + dout_con << std::endl; UDPPeer *peer = m_connection->createServerPeer(address); @@ -529,11 +527,10 @@ void ConnectionSendThread::connect(Address address) m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address)); Address bind_addr; - if (address.isIPv6()) - bind_addr.setAddress((IPv6AddressBytes *) NULL); + bind_addr.setAddress(static_cast(nullptr)); else - bind_addr.setAddress(0, 0, 0, 0); + bind_addr.setAddress(static_cast(0)); m_connection->m_udpSocket.Bind(bind_addr); @@ -951,9 +948,9 @@ void ConnectionReceiveThread::receive(SharedBuffer &packetdata, session_t peer_id = readPeerId(*packetdata); u8 channelnum = readChannel(*packetdata); - if (channelnum > CHANNEL_COUNT - 1) { + if (channelnum >= CHANNEL_COUNT) { LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid channel " << (u32)channelnum << std::endl); + << "Receive(): Invalid channel " << (int)channelnum << std::endl); return; } @@ -1008,15 +1005,14 @@ void ConnectionReceiveThread::receive(SharedBuffer &packetdata, peer->ResetTimeout(); } - Channel *channel = nullptr; - if (dynamic_cast(&peer)) { - channel = &dynamic_cast(&peer)->channels[channelnum]; - } else { + auto *udpPeer = dynamic_cast(&peer); + if (!udpPeer) { LOG(derr_con << m_connection->getDesc() << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!" " Ignoring." << std::endl); return; } + Channel *channel = &udpPeer->channels[channelnum]; channel->UpdateBytesReceived(received_size);