diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 8c9f1cbac..0110f8c22 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -21,7 +21,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include #include #include -#include "connection.h" +#include "connection_internal.h" #include "serialization.h" #include "log.h" #include "porting.h" diff --git a/src/network/connection.h b/src/network/connection.h index 4c7c7a609..13a2cabb0 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -32,95 +32,6 @@ with this program; if not, write to the Free Software Foundation, Inc., #include #include -#define MAX_UDP_PEERS 65535 - -/* -=== NOTES === - -A packet is sent through a channel to a peer with a basic header: - Header (7 bytes): - [0] u32 protocol_id - [4] session_t sender_peer_id - [6] u8 channel -sender_peer_id: - Unique to each peer. - value 0 (PEER_ID_INEXISTENT) is reserved for making new connections - value 1 (PEER_ID_SERVER) is reserved for server - these constants are defined in constants.h -channel: - Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist. -*/ -#define BASE_HEADER_SIZE 7 -#define CHANNEL_COUNT 3 - -/* -Packet types: - -CONTROL: This is a packet used by the protocol. -- When this is processed, nothing is handed to the user. - Header (2 byte): - [0] u8 type - [1] u8 controltype -controltype and data description: - CONTROLTYPE_ACK - [2] u16 seqnum - CONTROLTYPE_SET_PEER_ID - [2] session_t peer_id_new - CONTROLTYPE_PING - - There is no actual reply, but this can be sent in a reliable - packet to get a reply - CONTROLTYPE_DISCO -*/ -enum ControlType : u8 { - CONTROLTYPE_ACK = 0, - CONTROLTYPE_SET_PEER_ID = 1, - CONTROLTYPE_PING = 2, - CONTROLTYPE_DISCO = 3, -}; - -/* -ORIGINAL: This is a plain packet with no control and no error -checking at all. -- When this is processed, it is directly handed to the user. - Header (1 byte): - [0] u8 type -*/ -//#define TYPE_ORIGINAL 1 -#define ORIGINAL_HEADER_SIZE 1 - -/* -SPLIT: These are sequences of packets forming one bigger piece of -data. -- When processed and all the packet_nums 0...packet_count-1 are - present (this should be buffered), the resulting data shall be - directly handed to the user. -- If the data fails to come up in a reasonable time, the buffer shall - be silently discarded. -- These can be sent as-is or atop of a RELIABLE packet stream. - Header (7 bytes): - [0] u8 type - [1] u16 seqnum - [3] u16 chunk_count - [5] u16 chunk_num -*/ -//#define TYPE_SPLIT 2 - -/* -RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs, -and they shall be delivered in the same order as sent. This is done -with a buffer in the receiving and transmitting end. -- When this is processed, the contents of each packet is recursively - processed as packets. - Header (3 bytes): - [0] u8 type - [1] u16 seqnum - -*/ -//#define TYPE_RELIABLE 3 -#define RELIABLE_HEADER_SIZE 3 -#define SEQNUM_INITIAL 65500 -#define SEQNUM_MAX 65535 - class NetworkPacket; namespace con @@ -129,336 +40,19 @@ namespace con class ConnectionReceiveThread; class ConnectionSendThread; -typedef enum MTProtocols { +enum MTProtocols { MTP_PRIMARY, MTP_UDP, MTP_MINETEST_RELIABLE_UDP -} MTProtocols; - -enum PacketType : u8 { - PACKET_TYPE_CONTROL = 0, - PACKET_TYPE_ORIGINAL = 1, - PACKET_TYPE_SPLIT = 2, - PACKET_TYPE_RELIABLE = 3, - PACKET_TYPE_MAX }; -inline bool seqnum_higher(u16 totest, u16 base) -{ - if (totest > base) - { - if ((totest - base) > (SEQNUM_MAX/2)) - return false; - - return true; - } - - if ((base - totest) > (SEQNUM_MAX/2)) - return true; - - return false; -} - -inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) -{ - u16 window_start = next; - u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); - - if (window_start < window_end) { - return ((seqnum >= window_start) && (seqnum < window_end)); - } - - - return ((seqnum < window_end) || (seqnum >= window_start)); -} - -static inline float CALC_DTIME(u64 lasttime, u64 curtime) -{ - float value = ( curtime - lasttime) / 1000.0; - return MYMAX(MYMIN(value,0.1),0.0); -} - -/* - Struct for all kinds of packets. Includes following data: - BASE_HEADER - u8[] packet data (usually copied from SharedBuffer) -*/ -struct BufferedPacket { - BufferedPacket(u32 a_size) - { - m_data.resize(a_size); - data = &m_data[0]; - } - - DISABLE_CLASS_COPY(BufferedPacket) - - u16 getSeqnum() const; - - inline size_t size() const { return m_data.size(); } - - u8 *data; // Direct memory access - float time = 0.0f; // Seconds from buffering the packet or re-sending - float totaltime = 0.0f; // Seconds from buffering the packet - u64 absolute_send_time = -1; - Address address; // Sender or destination - unsigned int resend_count = 0; - -private: - std::vector m_data; // Data of the packet, including headers -}; - -typedef std::shared_ptr BufferedPacketPtr; - - -// This adds the base headers to the data and makes a packet out of it -BufferedPacketPtr makePacket(Address &address, const SharedBuffer &data, - u32 protocol_id, session_t sender_peer_id, u8 channel); - -// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet -// Increments split_seqnum if a split packet is made -void makeAutoSplitPacket(const SharedBuffer &data, u32 chunksize_max, - u16 &split_seqnum, std::list> *list); - -// Add the TYPE_RELIABLE header to the data -SharedBuffer makeReliablePacket(const SharedBuffer &data, u16 seqnum); - -struct IncomingSplitPacket -{ - IncomingSplitPacket(u32 cc, bool r): - chunk_count(cc), reliable(r) {} - - IncomingSplitPacket() = delete; - - float time = 0.0f; // Seconds from adding - u32 chunk_count; - bool reliable; // If true, isn't deleted on timeout - - bool allReceived() const - { - return (chunks.size() == chunk_count); - } - bool insert(u32 chunk_num, SharedBuffer &chunkdata); - SharedBuffer reassemble(); - -private: - // Key is chunk number, value is data without headers - std::map> chunks; -}; - -/* - A buffer which stores reliable packets and sorts them internally - for fast access to the smallest one. -*/ - -class ReliablePacketBuffer -{ -public: - ReliablePacketBuffer() = default; - - bool getFirstSeqnum(u16& result); - - BufferedPacketPtr popFirst(); - BufferedPacketPtr popSeqnum(u16 seqnum); - void insert(BufferedPacketPtr &p_ptr, u16 next_expected); - - void incrementTimeouts(float dtime); - u32 getTimedOuts(float timeout); - // timeout relative to last resend - std::vector> getResend(float timeout, u32 max_packets); - - void print(); - bool empty(); - u32 size(); - - -private: - typedef std::list::iterator FindResult; - - FindResult findPacketNoLock(u16 seqnum); - - std::list m_list; - - u16 m_oldest_non_answered_ack; - - std::mutex m_list_mutex; -}; - -/* - A buffer for reconstructing split packets -*/ - -class IncomingSplitBuffer -{ -public: - ~IncomingSplitBuffer(); - /* - Returns a reference counted buffer of length != 0 when a full split - packet is constructed. If not, returns one of length 0. - */ - SharedBuffer insert(BufferedPacketPtr &p_ptr, bool reliable); - - void removeUnreliableTimedOuts(float dtime, float timeout); - -private: - // Key is seqnum - std::map m_buf; - - std::mutex m_map_mutex; -}; - -enum ConnectionCommandType{ - CONNCMD_NONE, - CONNCMD_SERVE, - CONNCMD_CONNECT, - CONNCMD_DISCONNECT, - CONNCMD_DISCONNECT_PEER, - CONNCMD_SEND, - CONNCMD_SEND_TO_ALL, - CONCMD_ACK, - CONCMD_CREATE_PEER, - CONNCMD_RESEND_ONE -}; - -struct ConnectionCommand; -typedef std::shared_ptr ConnectionCommandPtr; - -// This is very similar to ConnectionEvent -struct ConnectionCommand -{ - const ConnectionCommandType type; - Address address; - session_t peer_id = PEER_ID_INEXISTENT; - u8 channelnum = 0; - Buffer data; - bool reliable = false; - bool raw = false; - - DISABLE_CLASS_COPY(ConnectionCommand); - - static ConnectionCommandPtr serve(Address address); - static ConnectionCommandPtr connect(Address address); - static ConnectionCommandPtr disconnect(); - static ConnectionCommandPtr disconnect_peer(session_t peer_id); - static ConnectionCommandPtr resend_one(session_t peer_id); - static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable); - static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer &data); - static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer &data); - -private: - ConnectionCommand(ConnectionCommandType type_) : - type(type_) {} - - static ConnectionCommandPtr create(ConnectionCommandType type); -}; - -/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about - * touching it, the less you're away from it the more likely data corruption - * will occur - */ -#define MAX_RELIABLE_WINDOW_SIZE 0x8000 -/* starting value for window size */ -#define START_RELIABLE_WINDOW_SIZE 0x400 -/* minimum value for window size */ -#define MIN_RELIABLE_WINDOW_SIZE 0x40 - -class Channel -{ - -public: - u16 readNextIncomingSeqNum(); - u16 incNextIncomingSeqNum(); - - u16 getOutgoingSequenceNumber(bool& successful); - u16 readOutgoingSequenceNumber(); - bool putBackSequenceNumber(u16); - - u16 readNextSplitSeqNum(); - void setNextSplitSeqNum(u16 seqnum); - - // This is for buffering the incoming packets that are coming in - // the wrong order - ReliablePacketBuffer incoming_reliables; - // This is for buffering the sent packets so that the sender can - // re-send them if no ACK is received - ReliablePacketBuffer outgoing_reliables_sent; - - //queued reliable packets - std::queue queued_reliables; - - //queue commands prior splitting to packets - std::deque queued_commands; - - IncomingSplitBuffer incoming_splits; - - Channel() = default; - ~Channel() = default; - - void UpdatePacketLossCounter(unsigned int count); - void UpdatePacketTooLateCounter(); - void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); - void UpdateBytesLost(unsigned int bytes); - void UpdateBytesReceived(unsigned int bytes); - - void UpdateTimers(float dtime); - - float getCurrentDownloadRateKB() - { MutexAutoLock lock(m_internal_mutex); return cur_kbps; }; - float getMaxDownloadRateKB() - { MutexAutoLock lock(m_internal_mutex); return max_kbps; }; - - float getCurrentLossRateKB() - { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; - float getMaxLossRateKB() - { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; - - float getCurrentIncomingRateKB() - { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; - float getMaxIncomingRateKB() - { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; - - float getAvgDownloadRateKB() - { MutexAutoLock lock(m_internal_mutex); return avg_kbps; }; - float getAvgLossRateKB() - { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; - float getAvgIncomingRateKB() - { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; - - u16 getWindowSize() const { return m_window_size; }; - - void setWindowSize(long size) - { - m_window_size = (u16)rangelim(size, MIN_RELIABLE_WINDOW_SIZE, MAX_RELIABLE_WINDOW_SIZE); - } - -private: - std::mutex m_internal_mutex; - u16 m_window_size = MIN_RELIABLE_WINDOW_SIZE; - - u16 next_incoming_seqnum = SEQNUM_INITIAL; - - u16 next_outgoing_seqnum = SEQNUM_INITIAL; - u16 next_outgoing_split_seqnum = SEQNUM_INITIAL; - - unsigned int current_packet_loss = 0; - unsigned int current_packet_too_late = 0; - unsigned int current_packet_successful = 0; - float packet_loss_counter = 0.0f; - - unsigned int current_bytes_transfered = 0; - unsigned int current_bytes_received = 0; - unsigned int current_bytes_lost = 0; - float max_kbps = 0.0f; - float cur_kbps = 0.0f; - float avg_kbps = 0.0f; - float max_incoming_kbps = 0.0f; - float cur_incoming_kbps = 0.0f; - float avg_incoming_kbps = 0.0f; - float max_kbps_lost = 0.0f; - float cur_kbps_lost = 0.0f; - float avg_kbps_lost = 0.0f; - float bpm_counter = 0.0f; - - unsigned int rate_samples = 0; +enum rate_stat_type { + CUR_DL_RATE, + AVG_DL_RATE, + CUR_INC_RATE, + AVG_INC_RATE, + CUR_LOSS_RATE, + AVG_LOSS_RATE, }; class Peer; @@ -480,16 +74,54 @@ private: Peer *m_peer = nullptr; }; -class Connection; +/* + Connection +*/ -typedef enum { - CUR_DL_RATE, - AVG_DL_RATE, - CUR_INC_RATE, - AVG_INC_RATE, - CUR_LOSS_RATE, - AVG_LOSS_RATE, -} rate_stat_type; +enum ConnectionEventType { + CONNEVENT_NONE, + CONNEVENT_DATA_RECEIVED, + CONNEVENT_PEER_ADDED, + CONNEVENT_PEER_REMOVED, + CONNEVENT_BIND_FAILED, +}; + +struct ConnectionEvent; +typedef std::shared_ptr ConnectionEventPtr; + +// This is very similar to ConnectionCommand +struct ConnectionEvent +{ + const ConnectionEventType type; + session_t peer_id = 0; + Buffer data; + bool timeout = false; + Address address; + + // We don't want to copy "data" + DISABLE_CLASS_COPY(ConnectionEvent); + + static ConnectionEventPtr create(ConnectionEventType type); + static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer &data); + static ConnectionEventPtr peerAdded(session_t peer_id, Address address); + static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address); + static ConnectionEventPtr bindFailed(); + + const char *describe() const; + +private: + ConnectionEvent(ConnectionEventType type_) : + type(type_) {} +}; + +struct ConnectionCommand; +typedef std::shared_ptr ConnectionCommandPtr; + +struct BufferedPacket; +typedef std::shared_ptr BufferedPacketPtr; + +class Connection; +class PeerHandler; class Peer { public: @@ -615,102 +247,7 @@ class Peer { u64 m_last_timeout_check; }; -class UDPPeer final : public Peer -{ -public: - - friend class PeerHelper; - friend class ConnectionReceiveThread; - friend class ConnectionSendThread; - friend class Connection; - - UDPPeer(u16 a_id, Address a_address, Connection* connection); - virtual ~UDPPeer() = default; - - void PutReliableSendCommand(ConnectionCommandPtr &c, - unsigned int max_packet_size) override; - - bool getAddress(MTProtocols type, Address& toset) override; - - u16 getNextSplitSequenceNumber(u8 channel) override; - void setNextSplitSequenceNumber(u8 channel, u16 seqnum) override; - - SharedBuffer addSplitPacket(u8 channel, BufferedPacketPtr &toadd, - bool reliable) override; - - bool isTimedOut(float timeout, std::string &reason) override; - -protected: - /* - Calculates avg_rtt and resend_timeout. - rtt=-1 only recalculates resend_timeout - */ - void reportRTT(float rtt) override; - - void RunCommandQueues( - unsigned int max_packet_size, - unsigned int maxtransfer); - - float getResendTimeout() - { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } - - void setResendTimeout(float timeout) - { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } - - bool Ping(float dtime, SharedBuffer& data) override; - - Channel channels[CHANNEL_COUNT]; - bool m_pending_disconnect = false; -private: - // This is changed dynamically - float resend_timeout = 0.5; - - bool processReliableSendCommand( - ConnectionCommandPtr &c_ptr, - unsigned int max_packet_size); -}; - -/* - Connection -*/ - -enum ConnectionEventType { - CONNEVENT_NONE, - CONNEVENT_DATA_RECEIVED, - CONNEVENT_PEER_ADDED, - CONNEVENT_PEER_REMOVED, - CONNEVENT_BIND_FAILED, -}; - -struct ConnectionEvent; -typedef std::shared_ptr ConnectionEventPtr; - -// This is very similar to ConnectionCommand -struct ConnectionEvent -{ - const ConnectionEventType type; - session_t peer_id = 0; - Buffer data; - bool timeout = false; - Address address; - - // We don't want to copy "data" - DISABLE_CLASS_COPY(ConnectionEvent); - - static ConnectionEventPtr create(ConnectionEventType type); - static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer &data); - static ConnectionEventPtr peerAdded(session_t peer_id, Address address); - static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address); - static ConnectionEventPtr bindFailed(); - - const char *describe() const; - -private: - ConnectionEvent(ConnectionEventType type_) : - type(type_) {} -}; - -class PeerHandler; +class UDPPeer; class Connection { diff --git a/src/network/connection_internal.h b/src/network/connection_internal.h new file mode 100644 index 000000000..1bb95d2cf --- /dev/null +++ b/src/network/connection_internal.h @@ -0,0 +1,499 @@ +/* +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#pragma once + +/********************************************/ +/* may only be included from in src/network */ +/********************************************/ + +#include "connection.h" + +#define MAX_UDP_PEERS 65535 + +/* +=== NOTES === + +A packet is sent through a channel to a peer with a basic header: + Header (7 bytes): + [0] u32 protocol_id + [4] session_t sender_peer_id + [6] u8 channel +sender_peer_id: + Unique to each peer. + value 0 (PEER_ID_INEXISTENT) is reserved for making new connections + value 1 (PEER_ID_SERVER) is reserved for server + these constants are defined in constants.h +channel: + Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist. +*/ +#define BASE_HEADER_SIZE 7 +#define CHANNEL_COUNT 3 + +/* +Packet types: + +CONTROL: This is a packet used by the protocol. +- When this is processed, nothing is handed to the user. + Header (2 byte): + [0] u8 type + [1] u8 controltype +controltype and data description: + CONTROLTYPE_ACK + [2] u16 seqnum + CONTROLTYPE_SET_PEER_ID + [2] session_t peer_id_new + CONTROLTYPE_PING + - There is no actual reply, but this can be sent in a reliable + packet to get a reply + CONTROLTYPE_DISCO +*/ +enum ControlType : u8 { + CONTROLTYPE_ACK = 0, + CONTROLTYPE_SET_PEER_ID = 1, + CONTROLTYPE_PING = 2, + CONTROLTYPE_DISCO = 3, +}; + +/* +ORIGINAL: This is a plain packet with no control and no error +checking at all. +- When this is processed, it is directly handed to the user. + Header (1 byte): + [0] u8 type +*/ +//#define TYPE_ORIGINAL 1 +#define ORIGINAL_HEADER_SIZE 1 + +/* +SPLIT: These are sequences of packets forming one bigger piece of +data. +- When processed and all the packet_nums 0...packet_count-1 are + present (this should be buffered), the resulting data shall be + directly handed to the user. +- If the data fails to come up in a reasonable time, the buffer shall + be silently discarded. +- These can be sent as-is or atop of a RELIABLE packet stream. + Header (7 bytes): + [0] u8 type + [1] u16 seqnum + [3] u16 chunk_count + [5] u16 chunk_num +*/ +//#define TYPE_SPLIT 2 + +/* +RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs, +and they shall be delivered in the same order as sent. This is done +with a buffer in the receiving and transmitting end. +- When this is processed, the contents of each packet is recursively + processed as packets. + Header (3 bytes): + [0] u8 type + [1] u16 seqnum + +*/ +//#define TYPE_RELIABLE 3 +#define RELIABLE_HEADER_SIZE 3 +#define SEQNUM_INITIAL 65500 +#define SEQNUM_MAX 65535 + +namespace con +{ + + +enum PacketType : u8 { + PACKET_TYPE_CONTROL = 0, + PACKET_TYPE_ORIGINAL = 1, + PACKET_TYPE_SPLIT = 2, + PACKET_TYPE_RELIABLE = 3, + PACKET_TYPE_MAX +}; + +inline bool seqnum_higher(u16 totest, u16 base) +{ + if (totest > base) + { + if ((totest - base) > (SEQNUM_MAX/2)) + return false; + + return true; + } + + if ((base - totest) > (SEQNUM_MAX/2)) + return true; + + return false; +} + +inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) +{ + u16 window_start = next; + u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); + + if (window_start < window_end) { + return ((seqnum >= window_start) && (seqnum < window_end)); + } + + + return ((seqnum < window_end) || (seqnum >= window_start)); +} + +inline float CALC_DTIME(u64 lasttime, u64 curtime) +{ + float value = (curtime - lasttime) / 1000.0f; + return MYMAX(MYMIN(value, 0.1f), 0.0f); +} + + +/* + Struct for all kinds of packets. Includes following data: + BASE_HEADER + u8[] packet data (usually copied from SharedBuffer) +*/ +struct BufferedPacket { + BufferedPacket(u32 a_size) + { + m_data.resize(a_size); + data = &m_data[0]; + } + + DISABLE_CLASS_COPY(BufferedPacket) + + u16 getSeqnum() const; + + inline size_t size() const { return m_data.size(); } + + u8 *data; // Direct memory access + float time = 0.0f; // Seconds from buffering the packet or re-sending + float totaltime = 0.0f; // Seconds from buffering the packet + u64 absolute_send_time = -1; + Address address; // Sender or destination + unsigned int resend_count = 0; + +private: + std::vector m_data; // Data of the packet, including headers +}; + + +// This adds the base headers to the data and makes a packet out of it +BufferedPacketPtr makePacket(Address &address, const SharedBuffer &data, + u32 protocol_id, session_t sender_peer_id, u8 channel); + +// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet +// Increments split_seqnum if a split packet is made +void makeAutoSplitPacket(const SharedBuffer &data, u32 chunksize_max, + u16 &split_seqnum, std::list> *list); + +// Add the TYPE_RELIABLE header to the data +SharedBuffer makeReliablePacket(const SharedBuffer &data, u16 seqnum); + +struct IncomingSplitPacket +{ + IncomingSplitPacket(u32 cc, bool r): + chunk_count(cc), reliable(r) {} + + IncomingSplitPacket() = delete; + + float time = 0.0f; // Seconds from adding + u32 chunk_count; + bool reliable; // If true, isn't deleted on timeout + + bool allReceived() const + { + return (chunks.size() == chunk_count); + } + bool insert(u32 chunk_num, SharedBuffer &chunkdata); + SharedBuffer reassemble(); + +private: + // Key is chunk number, value is data without headers + std::map> chunks; +}; + +/* + A buffer which stores reliable packets and sorts them internally + for fast access to the smallest one. +*/ + +class ReliablePacketBuffer +{ +public: + ReliablePacketBuffer() = default; + + bool getFirstSeqnum(u16& result); + + BufferedPacketPtr popFirst(); + BufferedPacketPtr popSeqnum(u16 seqnum); + void insert(BufferedPacketPtr &p_ptr, u16 next_expected); + + void incrementTimeouts(float dtime); + u32 getTimedOuts(float timeout); + // timeout relative to last resend + std::vector> getResend(float timeout, u32 max_packets); + + void print(); + bool empty(); + u32 size(); + + +private: + typedef std::list::iterator FindResult; + + FindResult findPacketNoLock(u16 seqnum); + + std::list m_list; + + u16 m_oldest_non_answered_ack; + + std::mutex m_list_mutex; +}; + +/* + A buffer for reconstructing split packets +*/ + +class IncomingSplitBuffer +{ +public: + ~IncomingSplitBuffer(); + /* + Returns a reference counted buffer of length != 0 when a full split + packet is constructed. If not, returns one of length 0. + */ + SharedBuffer insert(BufferedPacketPtr &p_ptr, bool reliable); + + void removeUnreliableTimedOuts(float dtime, float timeout); + +private: + // Key is seqnum + std::map m_buf; + + std::mutex m_map_mutex; +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_DISCONNECT_PEER, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONCMD_ACK, + CONCMD_CREATE_PEER, + CONNCMD_RESEND_ONE +}; + +// This is very similar to ConnectionEvent +struct ConnectionCommand +{ + const ConnectionCommandType type; + Address address; + session_t peer_id = PEER_ID_INEXISTENT; + u8 channelnum = 0; + Buffer data; + bool reliable = false; + bool raw = false; + + DISABLE_CLASS_COPY(ConnectionCommand); + + static ConnectionCommandPtr serve(Address address); + static ConnectionCommandPtr connect(Address address); + static ConnectionCommandPtr disconnect(); + static ConnectionCommandPtr disconnect_peer(session_t peer_id); + static ConnectionCommandPtr resend_one(session_t peer_id); + static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable); + static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer &data); + static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer &data); + +private: + ConnectionCommand(ConnectionCommandType type_) : + type(type_) {} + + static ConnectionCommandPtr create(ConnectionCommandType type); +}; + +/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about + * touching it, the less you're away from it the more likely data corruption + * will occur + */ +#define MAX_RELIABLE_WINDOW_SIZE 0x8000 +/* starting value for window size */ +#define START_RELIABLE_WINDOW_SIZE 0x400 +/* minimum value for window size */ +#define MIN_RELIABLE_WINDOW_SIZE 0x40 + +class Channel +{ + +public: + u16 readNextIncomingSeqNum(); + u16 incNextIncomingSeqNum(); + + u16 getOutgoingSequenceNumber(bool& successful); + u16 readOutgoingSequenceNumber(); + bool putBackSequenceNumber(u16); + + u16 readNextSplitSeqNum(); + void setNextSplitSeqNum(u16 seqnum); + + // This is for buffering the incoming packets that are coming in + // the wrong order + ReliablePacketBuffer incoming_reliables; + // This is for buffering the sent packets so that the sender can + // re-send them if no ACK is received + ReliablePacketBuffer outgoing_reliables_sent; + + //queued reliable packets + std::queue queued_reliables; + + //queue commands prior splitting to packets + std::deque queued_commands; + + IncomingSplitBuffer incoming_splits; + + Channel() = default; + ~Channel() = default; + + void UpdatePacketLossCounter(unsigned int count); + void UpdatePacketTooLateCounter(); + void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); + void UpdateBytesLost(unsigned int bytes); + void UpdateBytesReceived(unsigned int bytes); + + void UpdateTimers(float dtime); + + float getCurrentDownloadRateKB() + { MutexAutoLock lock(m_internal_mutex); return cur_kbps; }; + float getMaxDownloadRateKB() + { MutexAutoLock lock(m_internal_mutex); return max_kbps; }; + + float getCurrentLossRateKB() + { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; + float getMaxLossRateKB() + { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; + + float getCurrentIncomingRateKB() + { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; + float getMaxIncomingRateKB() + { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; + + float getAvgDownloadRateKB() + { MutexAutoLock lock(m_internal_mutex); return avg_kbps; }; + float getAvgLossRateKB() + { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; + float getAvgIncomingRateKB() + { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; + + u16 getWindowSize() const { return m_window_size; }; + + void setWindowSize(long size) + { + m_window_size = (u16)rangelim(size, MIN_RELIABLE_WINDOW_SIZE, MAX_RELIABLE_WINDOW_SIZE); + } + +private: + std::mutex m_internal_mutex; + u16 m_window_size = MIN_RELIABLE_WINDOW_SIZE; + + u16 next_incoming_seqnum = SEQNUM_INITIAL; + + u16 next_outgoing_seqnum = SEQNUM_INITIAL; + u16 next_outgoing_split_seqnum = SEQNUM_INITIAL; + + unsigned int current_packet_loss = 0; + unsigned int current_packet_too_late = 0; + unsigned int current_packet_successful = 0; + float packet_loss_counter = 0.0f; + + unsigned int current_bytes_transfered = 0; + unsigned int current_bytes_received = 0; + unsigned int current_bytes_lost = 0; + float max_kbps = 0.0f; + float cur_kbps = 0.0f; + float avg_kbps = 0.0f; + float max_incoming_kbps = 0.0f; + float cur_incoming_kbps = 0.0f; + float avg_incoming_kbps = 0.0f; + float max_kbps_lost = 0.0f; + float cur_kbps_lost = 0.0f; + float avg_kbps_lost = 0.0f; + float bpm_counter = 0.0f; + + unsigned int rate_samples = 0; +}; + + +class UDPPeer final : public Peer +{ +public: + + friend class PeerHelper; + friend class ConnectionReceiveThread; + friend class ConnectionSendThread; + friend class Connection; + + UDPPeer(u16 id, Address address, Connection *connection); + virtual ~UDPPeer() = default; + + void PutReliableSendCommand(ConnectionCommandPtr &c, + unsigned int max_packet_size) override; + + bool getAddress(MTProtocols type, Address& toset) override; + + u16 getNextSplitSequenceNumber(u8 channel) override; + void setNextSplitSequenceNumber(u8 channel, u16 seqnum) override; + + SharedBuffer addSplitPacket(u8 channel, BufferedPacketPtr &toadd, + bool reliable) override; + + bool isTimedOut(float timeout, std::string &reason) override; + +protected: + /* + Calculates avg_rtt and resend_timeout. + rtt=-1 only recalculates resend_timeout + */ + void reportRTT(float rtt) override; + + void RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxtransfer); + + float getResendTimeout() + { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } + + void setResendTimeout(float timeout) + { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + + bool Ping(float dtime, SharedBuffer& data) override; + + Channel channels[CHANNEL_COUNT]; + bool m_pending_disconnect = false; +private: + // This is changed dynamically + float resend_timeout = 0.5; + + bool processReliableSendCommand( + ConnectionCommandPtr &c_ptr, + unsigned int max_packet_size); +}; + +} diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h index 7e0d44373..ce96e4342 100644 --- a/src/network/connectionthreads.h +++ b/src/network/connectionthreads.h @@ -20,9 +20,13 @@ with this program; if not, write to the Free Software Foundation, Inc., #pragma once +/********************************************/ +/* may only be included from in src/network */ +/********************************************/ + #include #include "threading/thread.h" -#include "connection.h" +#include "connection_internal.h" namespace con { diff --git a/src/unittest/test_connection.cpp b/src/unittest/test_connection.cpp index ea280f56b..4adbc9039 100644 --- a/src/unittest/test_connection.cpp +++ b/src/unittest/test_connection.cpp @@ -23,7 +23,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "porting.h" #include "settings.h" #include "util/serialize.h" -#include "network/connection.h" +#include "network/connection_internal.h" #include "network/networkpacket.h" #include "network/socket.h"