/* Minetest Copyright (C) 2013 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "connection.h" #include "main.h" #include "serialization.h" #include "log.h" #include "porting.h" #include "util/serialize.h" #include "util/numeric.h" #include "util/string.h" #include "settings.h" namespace con { static u16 readPeerId(u8 *packetdata) { return readU16(&packetdata[4]); } static u8 readChannel(u8 *packetdata) { return readU8(&packetdata[6]); } BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, u32 protocol_id, u16 sender_peer_id, u8 channel) { u32 packet_size = datasize + BASE_HEADER_SIZE; BufferedPacket p(packet_size); p.address = address; writeU32(&p.data[0], protocol_id); writeU16(&p.data[4], sender_peer_id); writeU8(&p.data[6], channel); memcpy(&p.data[BASE_HEADER_SIZE], data, datasize); return p; } BufferedPacket makePacket(Address &address, SharedBuffer &data, u32 protocol_id, u16 sender_peer_id, u8 channel) { return makePacket(address, *data, data.getSize(), protocol_id, sender_peer_id, channel); } SharedBuffer makeOriginalPacket( SharedBuffer data) { u32 header_size = 1; u32 packet_size = data.getSize() + header_size; SharedBuffer b(packet_size); writeU8(&b[0], TYPE_ORIGINAL); memcpy(&b[header_size], *data, data.getSize()); return b; } std::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum) { // Chunk packets, containing the TYPE_SPLIT header std::list > chunks; u32 chunk_header_size = 7; u32 maximum_data_size = chunksize_max - chunk_header_size; u32 start = 0; u32 end = 0; u32 chunk_num = 0; u16 chunk_count = 0; do{ end = start + maximum_data_size - 1; if(end > data.getSize() - 1) end = data.getSize() - 1; u32 payload_size = end - start + 1; u32 packet_size = chunk_header_size + payload_size; SharedBuffer chunk(packet_size); writeU8(&chunk[0], TYPE_SPLIT); writeU16(&chunk[1], seqnum); // [3] u16 chunk_count is written at next stage writeU16(&chunk[5], chunk_num); memcpy(&chunk[chunk_header_size], &data[start], payload_size); chunks.push_back(chunk); chunk_count++; start = end + 1; chunk_num++; } while(end != data.getSize() - 1); for(std::list >::iterator i = chunks.begin(); i != chunks.end(); ++i) { // Write chunk_count writeU16(&((*i)[3]), chunk_count); } return chunks; } std::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum) { u32 original_header_size = 1; std::list > list; if(data.getSize() + original_header_size > chunksize_max) { list = makeSplitPacket(data, chunksize_max, split_seqnum); split_seqnum++; return list; } else { list.push_back(makeOriginalPacket(data)); } return list; } SharedBuffer makeReliablePacket( SharedBuffer data, u16 seqnum) { /*dstream<<"BEGIN SharedBuffer makeReliablePacket()"< makeReliablePacket()"<::iterator i = m_list.begin(); i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); dout_con<::iterator i = m_list.begin(); for(; i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /*dout_con<<"findPacket(): finding seqnum="<= BASE_HEADER_SIZE+3); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_RELIABLE); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); ++m_list_size; // Find the right place for the packet and insert it there // If list is empty, just add it if(m_list.empty()) { m_list.push_back(p); // Done. return; } // Otherwise find the right place std::list::iterator i = m_list.begin(); // Find the first packet in the list which has a higher seqnum for(; i != m_list.end(); ++i){ u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if(s == seqnum){ --m_list_size; throw AlreadyExistsException("Same seqnum in list"); } if(seqnum_higher(s, seqnum)){ break; } } // If we're at the end of the list, add the packet to the // end of the list if(i == m_list.end()) { m_list.push_back(p); // Done. return; } // Insert before i m_list.insert(i, p); } void ReliablePacketBuffer::incrementTimeouts(float dtime) { for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { i->time += dtime; i->totaltime += dtime; } } void ReliablePacketBuffer::resetTimedOuts(float timeout) { for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { if(i->time >= timeout) i->time = 0.0; } } bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) { for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { if(i->totaltime >= timeout) return true; } return false; } std::list ReliablePacketBuffer::getTimedOuts(float timeout) { std::list timed_outs; for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { if(i->time >= timeout) timed_outs.push_back(*i); } return timed_outs; } /* IncomingSplitBuffer */ IncomingSplitBuffer::~IncomingSplitBuffer() { for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { delete i->second; } } /* This will throw a GotSplitPacketException when a full split packet is constructed. */ SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { u32 headersize = BASE_HEADER_SIZE + 7; assert(p.data.getSize() >= headersize); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_SPLIT); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); // Add if doesn't exist if(m_buf.find(seqnum) == m_buf.end()) { IncomingSplitPacket *sp = new IncomingSplitPacket(); sp->chunk_count = chunk_count; sp->reliable = reliable; m_buf[seqnum] = sp; } IncomingSplitPacket *sp = m_buf[seqnum]; // TODO: These errors should be thrown or something? Dunno. if(chunk_count != sp->chunk_count) derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count <reliable) derr_con<<"Connection: WARNING: reliable="<reliable="<reliable <chunks.find(chunk_num) != sp->chunks.end()) return SharedBuffer(); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; SharedBuffer chunkdata(chunkdatasize); memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); // Set chunk data in buffer sp->chunks[chunk_num] = chunkdata; // If not all chunks are received, return empty buffer if(sp->allReceived() == false) return SharedBuffer(); // Calculate total size u32 totalsize = 0; for(std::map >::iterator i = sp->chunks.begin(); i != sp->chunks.end(); ++i) { totalsize += i->second.getSize(); } SharedBuffer fulldata(totalsize); // Copy chunks to data buffer u32 start = 0; for(u32 chunk_i=0; chunk_ichunk_count; chunk_i++) { SharedBuffer buf = sp->chunks[chunk_i]; u16 chunkdatasize = buf.getSize(); memcpy(&fulldata[start], *buf, chunkdatasize); start += chunkdatasize;; } // Remove sp from buffer m_buf.erase(seqnum); delete sp; return fulldata; } void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::list remove_queue; for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { IncomingSplitPacket *p = i->second; // Reliable ones are not removed by timeout if(p->reliable == true) continue; p->time += dtime; if(p->time >= timeout) remove_queue.push_back(i->first); } for(std::list::iterator j = remove_queue.begin(); j != remove_queue.end(); ++j) { dout_con<<"NOTE: Removing timed out unreliable split packet" <= 0.0){ if(rtt < 0.01){ if(m_max_packets_per_second < congestion_control_max_rate) m_max_packets_per_second += 10; } else if(rtt < congestion_control_aim_rtt){ if(m_max_packets_per_second < congestion_control_max_rate) m_max_packets_per_second += 2; } else { m_max_packets_per_second *= 0.8; if(m_max_packets_per_second < congestion_control_min_rate) m_max_packets_per_second = congestion_control_min_rate; } } if(rtt < -0.999) {} else if(avg_rtt < 0.0) avg_rtt = rtt; else avg_rtt = rtt * 0.1 + avg_rtt * 0.9; // Calculate resend_timeout /*int reliable_count = 0; for(int i=0; i RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; resend_timeout = timeout; } /* Connection */ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6): m_protocol_id(protocol_id), m_max_packet_size(max_packet_size), m_timeout(timeout), m_socket(ipv6), m_peer_id(0), m_bc_peerhandler(NULL), m_bc_receive_timeout(0), m_indentation(0) { m_socket.setTimeoutMs(5); Start(); } Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, PeerHandler *peerhandler): m_protocol_id(protocol_id), m_max_packet_size(max_packet_size), m_timeout(timeout), m_socket(ipv6), m_peer_id(0), m_bc_peerhandler(peerhandler), m_bc_receive_timeout(0), m_indentation(0) { m_socket.setTimeoutMs(5); Start(); } Connection::~Connection() { Stop(); // Delete peers for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { delete j->second; } } /* Internal stuff */ void * Connection::Thread() { ThreadStarted(); log_register_thread("Connection"); dout_con<<"Connection thread started"< 0.1) dtime = 0.1; if(dtime < 0.0) dtime = 0.0; runTimeouts(dtime); while(!m_command_queue.empty()){ ConnectionCommand c = m_command_queue.pop_front(); processCommand(c); } send(dtime); receive(); END_DEBUG_EXCEPTION_HANDLER(derr_con); } return NULL; } void Connection::putEvent(ConnectionEvent &e) { assert(e.type != CONNEVENT_NONE); m_event_queue.push_back(e); } void Connection::processCommand(ConnectionCommand &c) { switch(c.type){ case CONNCMD_NONE: dout_con<::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; peer->m_sendtime_accu += dtime; peer->m_num_sent = 0; peer->m_max_num_sent = peer->m_sendtime_accu * peer->m_max_packets_per_second; } Queue postponed_packets; while(!m_outgoing_queue.empty()){ OutgoingPacket packet = m_outgoing_queue.pop_front(); Peer *peer = getPeerNoEx(packet.peer_id); if(!peer) continue; if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){ postponed_packets.push_back(packet); } else if(peer->m_num_sent < peer->m_max_num_sent){ rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_num_sent++; } else { postponed_packets.push_back(packet); } } while(!postponed_packets.empty()){ m_outgoing_queue.push_back(postponed_packets.pop_front()); } for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; peer->m_sendtime_accu -= (float)peer->m_num_sent / peer->m_max_packets_per_second; if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second) peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second; } } // Receive packets from the network and buffers and create ConnectionEvents void Connection::receive() { u32 datasize = m_max_packet_size * 2; // Double it just to be safe // TODO: We can not know how many layers of header there are. // For now, just assume there are no other than the base headers. u32 packet_maxsize = datasize + BASE_HEADER_SIZE; SharedBuffer packetdata(packet_maxsize); bool single_wait_done = false; for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS { try{ /* Check if some buffer has relevant data */ { u16 peer_id; SharedBuffer resultdata; bool got = getFromBuffers(peer_id, resultdata); if(got){ ConnectionEvent e; e.dataReceived(peer_id, resultdata); putEvent(e); continue; } } if(single_wait_done){ if(m_socket.WaitData(0) == false) break; } single_wait_done = true; Address sender; s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); if(received_size < 0) break; if(received_size < BASE_HEADER_SIZE) continue; if(readU32(&packetdata[0]) != m_protocol_id) continue; u16 peer_id = readPeerId(*packetdata); u8 channelnum = readChannel(*packetdata); if(channelnum > CHANNEL_COUNT-1){ PrintInfo(derr_con); derr_con<<"Receive(): Invalid channel "<::iterator j; j = m_peers.begin(); for(; j != m_peers.end(); ++j) { Peer *peer = j->second; if(peer->has_sent_with_id) continue; if(peer->address == sender) break; } /* If no peer was found with the same address and port, we shall assume it is a new peer and create an entry. */ if(j == m_peers.end()) { // Pass on to adding the peer } // Else: A peer was found. else { Peer *peer = j->second; peer_id = peer->id; PrintInfo(derr_con); derr_con<<"WARNING: Assuming unknown peer to be " <<"peer_id="<second; // Validate peer address if(peer->address != sender) { PrintInfo(derr_con); derr_con<<"Peer "<timeout_counter = 0.0; Channel *channel = &(peer->channels[channelnum]); // Throw the received packet to channel->processPacket() // Make a new SharedBuffer from the data without the base headers SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], strippeddata.getSize()); try{ // Process it (the result is some data with no headers made by us) SharedBuffer resultdata = processPacket (channel, strippeddata, peer_id, channelnum, false); PrintInfo(); dout_con<<"ProcessPacket returned data of size " <getFloat("congestion_control_aim_rtt"); float congestion_control_max_rate = g_settings->getFloat("congestion_control_max_rate"); float congestion_control_min_rate = g_settings->getFloat("congestion_control_min_rate"); std::list timeouted_peers; for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; // Update congestion control values peer->congestion_control_aim_rtt = congestion_control_aim_rtt; peer->congestion_control_max_rate = congestion_control_max_rate; peer->congestion_control_min_rate = congestion_control_min_rate; /* Check peer timeout */ peer->timeout_counter += dtime; if(peer->timeout_counter > m_timeout) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=peer->timeout_counter)" <id); // Don't bother going through the buffers of this one continue; } float resend_timeout = peer->resend_timeout; for(u16 i=0; i timed_outs; Channel *channel = &peer->channels[i]; // Remove timed out incomplete unreliable split packets channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); // Increment reliable packet times channel->outgoing_reliables.incrementTimeouts(dtime); // Check reliable packet total times, remove peer if // over timeout. if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=reliable packet totaltime)" <id); goto nextpeer; } // Re-send timed out outgoing reliables timed_outs = channel-> outgoing_reliables.getTimedOuts(resend_timeout); channel->outgoing_reliables.resetTimedOuts(resend_timeout); for(std::list::iterator j = timed_outs.begin(); j != timed_outs.end(); ++j) { u16 peer_id = readPeerId(*(j->data)); u8 channel = readChannel(*(j->data)); u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); PrintInfo(derr_con); derr_con<<"RE-SENDING timed-out RELIABLE to "; j->address.print(&derr_con); derr_con<<"(t/o="<::iterator node = m_peers.find(PEER_ID_SERVER); if(node != m_peers.end()){ throw ConnectionException("Already connected to a server"); } Peer *peer = new Peer(PEER_ID_SERVER, address); m_peers[peer->id] = peer; // Create event ConnectionEvent e; e.peerAdded(peer->id, peer->address); putEvent(e); m_socket.Bind(0); // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT m_peer_id = PEER_ID_INEXISTENT; SharedBuffer data(0); Send(PEER_ID_SERVER, 0, data, true); } void Connection::disconnect() { dout_con< data(2); writeU8(&data[0], TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_DISCO); // Send to all for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; rawSendAsPacket(peer->id, 0, data, false); } } void Connection::sendToAll(u8 channelnum, SharedBuffer data, bool reliable) { for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; send(peer->id, channelnum, data, reliable); } } void Connection::send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { dout_con<address, data, m_protocol_id, m_peer_id, channelnum); // Send the packet rawSend(p); } } void Connection::rawSend(const BufferedPacket &packet) { try{ m_socket.Send(packet.address, *packet.data, packet.data.getSize()); } catch(SendFailedException &e){ derr_con<<"Connection::rawSend(): SendFailedException: " <::iterator node = m_peers.find(peer_id); if(node == m_peers.end()){ throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)"); } // Error checking assert(node->second->id == peer_id); return node->second; } Peer* Connection::getPeerNoEx(u16 peer_id) { std::map::iterator node = m_peers.find(peer_id); if(node == m_peers.end()){ return NULL; } // Error checking assert(node->second->id == peer_id); return node->second; } std::list Connection::getPeers() { std::list list; for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; list.push_back(peer); } return list; } bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer &dst) { for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; for(u16 i=0; ichannels[i]; SharedBuffer resultdata; bool got = checkIncomingBuffers(channel, peer_id, resultdata); if(got){ dst = resultdata; return true; } } } return false; } bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id, SharedBuffer &dst) { u16 firstseqnum = 0; // Clear old packets from start of buffer for(;;){ bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum); if(!found) break; if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum)) channel->incoming_reliables.popFirst(); else break; } // This happens if all packets are old if(channel->incoming_reliables.empty() == false) { if(firstseqnum == channel->next_incoming_seqnum) { BufferedPacket p = channel->incoming_reliables.popFirst(); peer_id = readPeerId(*p.data); u8 channelnum = readChannel(*p.data); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); PrintInfo(); dout_con<<"UNBUFFERING TYPE_RELIABLE" <<" seqnum="<outgoing_reliables.print(); dout_con< payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); return payload; } else if(type == TYPE_SPLIT) { // We have to create a packet again for buffering // This isn't actually too bad an idea. BufferedPacket packet = makePacket( getPeer(peer_id)->address, packetdata, GetProtocolID(), peer_id, channelnum); // Buffer the packet SharedBuffer data = channel->incoming_splits.insert(packet, reliable); if(data.getSize() != 0) { PrintInfo(); dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " <<"size="<next_incoming_seqnum); bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum); PrintInfo(); if(is_future_packet) dout_con<<"BUFFERING"; else if(is_old_packet) dout_con<<"OLD"; else dout_con<<"RECUR"; dout_con<<" TYPE_RELIABLE seqnum="<incoming_reliables.size() < 100); // Send a CONTROLTYPE_ACK SharedBuffer reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_ACK); writeU16(&reply[2], seqnum); rawSendAsPacket(peer_id, channelnum, reply, false); //if(seqnum_higher(seqnum, channel->next_incoming_seqnum)) if(is_future_packet) { /*PrintInfo(); dout_con<<"Buffering reliable packet (seqnum=" <address, packetdata, GetProtocolID(), peer_id, channelnum); try{ channel->incoming_reliables.insert(packet); /*PrintInfo(); dout_con<<"INCOMING: "; channel->incoming_reliables.print(); dout_con<next_incoming_seqnum, seqnum)) else if(is_old_packet) { // An old packet, dump it throw InvalidIncomingDataException("Got an old reliable packet"); } channel->next_incoming_seqnum++; // Get out the inside packet and re-process it SharedBuffer payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); return processPacket(channel, payload, peer_id, channelnum, true); } else { PrintInfo(derr_con); derr_con<<"Got invalid type="<<((int)type&0xff)<address); putEvent(e); delete m_peers[peer_id]; m_peers.erase(peer_id); return true; } /* Interface */ ConnectionEvent Connection::getEvent() { if(m_event_queue.empty()){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } return m_event_queue.pop_front(); } ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try{ return m_event_queue.pop_front(timeout_ms); } catch(ItemNotFoundException &ex){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } } void Connection::putCommand(ConnectionCommand &c) { m_command_queue.push_back(c); } void Connection::Serve(unsigned short port) { ConnectionCommand c; c.serve(port); putCommand(c); } void Connection::Connect(Address address) { ConnectionCommand c; c.connect(address); putCommand(c); } bool Connection::Connected() { JMutexAutoLock peerlock(m_peers_mutex); if(m_peers.size() != 1) return false; std::map::iterator node = m_peers.find(PEER_ID_SERVER); if(node == m_peers.end()) return false; if(m_peer_id == PEER_ID_INEXISTENT) return false; return true; } void Connection::Disconnect() { ConnectionCommand c; c.disconnect(); putCommand(c); } u32 Connection::Receive(u16 &peer_id, SharedBuffer &data) { for(;;){ ConnectionEvent e = waitEvent(m_bc_receive_timeout); if(e.type != CONNEVENT_NONE) dout_con<(e.data); return e.data.getSize(); case CONNEVENT_PEER_ADDED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->peerAdded(&tmp); continue; } case CONNEVENT_PEER_REMOVED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->deletingPeer(&tmp, e.timeout); continue; } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " "(port already in use?)"); } } throw NoIncomingDataException("No incoming data"); } void Connection::SendToAll(u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.sendToAll(channelnum, data, reliable); putCommand(c); } void Connection::Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.send(peer_id, channelnum, data, reliable); putCommand(c); } void Connection::RunTimeouts(float dtime) { // No-op } Address Connection::GetPeerAddress(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->address; } float Connection::GetPeerAvgRTT(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->avg_rtt; } void Connection::DeletePeer(u16 peer_id) { ConnectionCommand c; c.deletePeer(peer_id); putCommand(c); } void Connection::PrintInfo(std::ostream &out) { out<