/* Minetest-c55 Copyright (C) 2010 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "connection.h" #include "main.h" #include "serialization.h" #include "log.h" #include "porting.h" #include "util/serialize.h" #include "util/numeric.h" #include "util/string.h" namespace con { static u16 readPeerId(u8 *packetdata) { return readU16(&packetdata[4]); } static u8 readChannel(u8 *packetdata) { return readU8(&packetdata[6]); } BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, u32 protocol_id, u16 sender_peer_id, u8 channel) { u32 packet_size = datasize + BASE_HEADER_SIZE; BufferedPacket p(packet_size); p.address = address; writeU32(&p.data[0], protocol_id); writeU16(&p.data[4], sender_peer_id); writeU8(&p.data[6], channel); memcpy(&p.data[BASE_HEADER_SIZE], data, datasize); return p; } BufferedPacket makePacket(Address &address, SharedBuffer &data, u32 protocol_id, u16 sender_peer_id, u8 channel) { return makePacket(address, *data, data.getSize(), protocol_id, sender_peer_id, channel); } SharedBuffer makeOriginalPacket( SharedBuffer data) { u32 header_size = 1; u32 packet_size = data.getSize() + header_size; SharedBuffer b(packet_size); writeU8(&b[0], TYPE_ORIGINAL); memcpy(&b[header_size], *data, data.getSize()); return b; } core::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum) { // Chunk packets, containing the TYPE_SPLIT header core::list > chunks; u32 chunk_header_size = 7; u32 maximum_data_size = chunksize_max - chunk_header_size; u32 start = 0; // u32 end = 0; // u32 chunk_num = 0; u32 data_size = data.getSize(); //calc once, not in the cycle u32 chunks_count = (u32)(data_size / maximum_data_size) +1; u32 payload_size = maximum_data_size; for(int i=0; i chunk(maximum_data_size + chunk_header_size); writeU8(&chunk[0], TYPE_SPLIT); writeU16(&chunk[1], seqnum); writeU16(&chunk[3], chunks_count); writeU16(&chunk[5], i); // chunk number start = i * maximum_data_size; // 0, 1*dsize, 2*dsize... // on the last iteration if((i+1) == chunks_count) // calc the tail which size smaller than maximum_data_size payload_size = data_size - start; memcpy(&chunk[chunk_header_size], &data[start], payload_size); chunks.push_back(chunk); } return chunks; } core::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum) { u32 original_header_size = 1; core::list > list; if(data.getSize() + original_header_size > chunksize_max) { list = makeSplitPacket(data, chunksize_max, split_seqnum); split_seqnum++; return list; } else { list.push_back(makeOriginalPacket(data)); } return list; } SharedBuffer makeReliablePacket( SharedBuffer data, u16 seqnum) { /*dstream<<"BEGIN SharedBuffer makeReliablePacket()"< makeReliablePacket()"<::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); dout_con<::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /*dout_con<<"findPacket(): finding seqnum="<::Iterator i = m_list.begin(); m_list.erase(i); return p; } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { RPBSearchResult r = findPacket(seqnum); if(r == notFound()){ dout_con<<"Not found"<= BASE_HEADER_SIZE+3); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_RELIABLE); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); // Find the right place for the packet and insert it there // If list is empty, just add it if(m_list.empty()) { m_list.push_back(p); // Done. return; } // Otherwise find the right place core::list::Iterator i; i = m_list.begin(); // Find the first packet in the list which has a higher seqnum for(; i != m_list.end(); i++){ u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if(s == seqnum){ throw AlreadyExistsException("Same seqnum in list"); } if(seqnum_higher(s, seqnum)){ break; } } // If we're at the end of the list, add the packet to the // end of the list if(i == m_list.end()) { m_list.push_back(p); // Done. return; } // Insert before i m_list.insert_before(i, p); } void ReliablePacketBuffer::incrementTimeouts(float dtime) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ i->time += dtime; i->totaltime += dtime; } } void ReliablePacketBuffer::resetTimedOuts(float timeout) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ if(i->time >= timeout) i->time = 0.0; } } bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ if(i->totaltime >= timeout) return true; } return false; } core::list ReliablePacketBuffer::getTimedOuts(float timeout) { core::list timed_outs; core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { if(i->time >= timeout) timed_outs.push_back(*i); } return timed_outs; } /* IncomingSplitBuffer */ IncomingSplitBuffer::~IncomingSplitBuffer() { core::map::Iterator i; i = m_buf.getIterator(); for(; i.atEnd() == false; i++) { delete i.getNode()->getValue(); } } /* This will throw a GotSplitPacketException when a full split packet is constructed. */ SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { u32 headersize = BASE_HEADER_SIZE + 7; assert(p.data.getSize() >= headersize); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_SPLIT); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); // Add if doesn't exist if(m_buf.find(seqnum) == NULL) { IncomingSplitPacket *sp = new IncomingSplitPacket(); sp->chunk_count = chunk_count; sp->reliable = reliable; m_buf[seqnum] = sp; } IncomingSplitPacket *sp = m_buf[seqnum]; // TODO: These errors should be thrown or something? Dunno. if(chunk_count != sp->chunk_count) derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count <reliable) derr_con<<"Connection: WARNING: reliable="<reliable="<reliable <chunks.find(chunk_num) != NULL) return SharedBuffer(); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; SharedBuffer chunkdata(chunkdatasize); memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); // Set chunk data in buffer sp->chunks[chunk_num] = chunkdata; // If not all chunks are received, return empty buffer if(sp->allReceived() == false) return SharedBuffer(); // Calculate total size u32 totalsize = 0; core::map >::Iterator i; i = sp->chunks.getIterator(); for(; i.atEnd() == false; i++) { totalsize += i.getNode()->getValue().getSize(); } SharedBuffer fulldata(totalsize); // Copy chunks to data buffer u32 start = 0; for(u32 chunk_i=0; chunk_ichunk_count; chunk_i++) { SharedBuffer buf = sp->chunks[chunk_i]; u16 chunkdatasize = buf.getSize(); memcpy(&fulldata[start], *buf, chunkdatasize); start += chunkdatasize;; } // Remove sp from buffer m_buf.remove(seqnum); delete sp; return fulldata; } void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { core::list remove_queue; core::map::Iterator i; i = m_buf.getIterator(); for(; i.atEnd() == false; i++) { IncomingSplitPacket *p = i.getNode()->getValue(); // Reliable ones are not removed by timeout if(p->reliable == true) continue; p->time += dtime; if(p->time >= timeout) remove_queue.push_back(i.getNode()->getKey()); } core::list::Iterator j; j = remove_queue.begin(); for(; j != remove_queue.end(); j++) { dout_con<<"NOTE: Removing timed out unreliable split packet" <= 0.0){ if(rtt < 0.01){ if(m_max_packets_per_second < 400) m_max_packets_per_second += 10; } else if(rtt < 0.2){ if(m_max_packets_per_second < 100) m_max_packets_per_second += 2; } else { m_max_packets_per_second *= 0.8; if(m_max_packets_per_second < 10) m_max_packets_per_second = 10; } } if(rtt < -0.999) {} else if(avg_rtt < 0.0) avg_rtt = rtt; else avg_rtt = rtt * 0.1 + avg_rtt * 0.9; // Calculate resend_timeout /*int reliable_count = 0; for(int i=0; i RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; resend_timeout = timeout; } /* Connection */ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout): m_protocol_id(protocol_id), m_max_packet_size(max_packet_size), m_timeout(timeout), m_peer_id(0), m_bc_peerhandler(NULL), m_bc_receive_timeout(0), m_indentation(0) { m_socket.setTimeoutMs(5); Start(); } Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, PeerHandler *peerhandler): m_protocol_id(protocol_id), m_max_packet_size(max_packet_size), m_timeout(timeout), m_peer_id(0), m_bc_peerhandler(peerhandler), m_bc_receive_timeout(0), m_indentation(0) { m_socket.setTimeoutMs(5); Start(); } Connection::~Connection() { stop(); // Delete peers for(core::map::Iterator j = m_peers.getIterator(); j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); delete peer; } } /* Internal stuff */ void * Connection::Thread() { ThreadStarted(); log_register_thread("Connection"); dout_con<<"Connection thread started"< 0.1) dtime = 0.1; if(dtime < 0.0) dtime = 0.0; runTimeouts(dtime); while(m_command_queue.size() != 0){ ConnectionCommand c = m_command_queue.pop_front(); processCommand(c); } send(dtime); receive(); END_DEBUG_EXCEPTION_HANDLER(derr_con); } return NULL; } void Connection::putEvent(ConnectionEvent &e) { assert(e.type != CONNEVENT_NONE); m_event_queue.push_back(e); } void Connection::processCommand(ConnectionCommand &c) { switch(c.type){ case CONNCMD_NONE: dout_con<::Iterator j = m_peers.getIterator(); j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); peer->m_sendtime_accu += dtime; peer->m_num_sent = 0; peer->m_max_num_sent = peer->m_sendtime_accu * peer->m_max_packets_per_second; } Queue postponed_packets; while(m_outgoing_queue.size() != 0){ OutgoingPacket packet = m_outgoing_queue.pop_front(); Peer *peer = getPeerNoEx(packet.peer_id); if(!peer) continue; if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){ postponed_packets.push_back(packet); } else if(peer->m_num_sent < peer->m_max_num_sent){ rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_num_sent++; } else { postponed_packets.push_back(packet); } } while(postponed_packets.size() != 0){ m_outgoing_queue.push_back(postponed_packets.pop_front()); } for(core::map::Iterator j = m_peers.getIterator(); j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); peer->m_sendtime_accu -= (float)peer->m_num_sent / peer->m_max_packets_per_second; if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second) peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second; } } // Receive packets from the network and buffers and create ConnectionEvents void Connection::receive() { u32 datasize = m_max_packet_size * 2; // Double it just to be safe // TODO: We can not know how many layers of header there are. // For now, just assume there are no other than the base headers. u32 packet_maxsize = datasize + BASE_HEADER_SIZE; SharedBuffer packetdata(packet_maxsize); bool single_wait_done = false; for(;;) { try{ /* Check if some buffer has relevant data */ { u16 peer_id; SharedBuffer resultdata; bool got = getFromBuffers(peer_id, resultdata); if(got){ ConnectionEvent e; e.dataReceived(peer_id, resultdata); putEvent(e); continue; } } if(single_wait_done){ if(m_socket.WaitData(0) == false) break; } single_wait_done = true; Address sender; s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); if(received_size < 0) break; if(received_size < BASE_HEADER_SIZE) continue; if(readU32(&packetdata[0]) != m_protocol_id) continue; u16 peer_id = readPeerId(*packetdata); u8 channelnum = readChannel(*packetdata); if(channelnum > CHANNEL_COUNT-1){ PrintInfo(derr_con); derr_con<<"Receive(): Invalid channel "<::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); if(peer->has_sent_with_id) continue; if(peer->address == sender) break; } /* If no peer was found with the same address and port, we shall assume it is a new peer and create an entry. */ if(j.atEnd()) { // Pass on to adding the peer } // Else: A peer was found. else { Peer *peer = j.getNode()->getValue(); peer_id = peer->id; PrintInfo(derr_con); derr_con<<"WARNING: Assuming unknown peer to be " <<"peer_id="<getValue(); // Validate peer address if(peer->address != sender) { PrintInfo(derr_con); derr_con<<"Peer "<timeout_counter = 0.0; Channel *channel = &(peer->channels[channelnum]); // Throw the received packet to channel->processPacket() // Make a new SharedBuffer from the data without the base headers SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], strippeddata.getSize()); try{ // Process it (the result is some data with no headers made by us) SharedBuffer resultdata = processPacket (channel, strippeddata, peer_id, channelnum, false); PrintInfo(); dout_con<<"ProcessPacket returned data of size " < timeouted_peers; core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); /* Check peer timeout */ peer->timeout_counter += dtime; if(peer->timeout_counter > m_timeout) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=peer->timeout_counter)" <id); // Don't bother going through the buffers of this one continue; } float resend_timeout = peer->resend_timeout; for(u16 i=0; i timed_outs; core::list::Iterator j; Channel *channel = &peer->channels[i]; // Remove timed out incomplete unreliable split packets channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); // Increment reliable packet times channel->outgoing_reliables.incrementTimeouts(dtime); // Check reliable packet total times, remove peer if // over timeout. if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=reliable packet totaltime)" <id); goto nextpeer; } // Re-send timed out outgoing reliables timed_outs = channel-> outgoing_reliables.getTimedOuts(resend_timeout); channel->outgoing_reliables.resetTimedOuts(resend_timeout); j = timed_outs.begin(); for(; j != timed_outs.end(); j++) { u16 peer_id = readPeerId(*(j->data)); u8 channel = readChannel(*(j->data)); u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); PrintInfo(derr_con); derr_con<<"RE-SENDING timed-out RELIABLE to "; j->address.print(&derr_con); derr_con<<"(t/o="<::Node *node = m_peers.find(PEER_ID_SERVER); if(node != NULL){ throw ConnectionException("Already connected to a server"); } Peer *peer = new Peer(PEER_ID_SERVER, address); m_peers.insert(peer->id, peer); // Create event ConnectionEvent e; e.peerAdded(peer->id, peer->address); putEvent(e); m_socket.Bind(0); // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT m_peer_id = PEER_ID_INEXISTENT; SharedBuffer data(0); Send(PEER_ID_SERVER, 0, data, true); } void Connection::disconnect() { dout_con< data(2); writeU8(&data[0], TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_DISCO); // Send to all core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); rawSendAsPacket(peer->id, 0, data, false); } } void Connection::sendToAll(u8 channelnum, SharedBuffer data, bool reliable) { core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); send(peer->id, channelnum, data, reliable); } } void Connection::send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { dout_con<address, data, m_protocol_id, m_peer_id, channelnum); // Send the packet rawSend(p); } } void Connection::rawSend(const BufferedPacket &packet) { try{ m_socket.Send(packet.address, *packet.data, packet.data.getSize()); } catch(SendFailedException &e){ derr_con<<"Connection::rawSend(): SendFailedException: " <::Node *node = m_peers.find(peer_id); if(node == NULL){ throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)"); } // Error checking assert(node->getValue()->id == peer_id); return node->getValue(); } Peer* Connection::getPeerNoEx(u16 peer_id) { core::map::Node *node = m_peers.find(peer_id); if(node == NULL){ return NULL; } // Error checking assert(node->getValue()->id == peer_id); return node->getValue(); } core::list Connection::getPeers() { core::list list; core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); list.push_back(peer); } return list; } bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer &dst) { core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); for(u16 i=0; ichannels[i]; SharedBuffer resultdata; bool got = checkIncomingBuffers(channel, peer_id, resultdata); if(got){ dst = resultdata; return true; } } } return false; } bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id, SharedBuffer &dst) { u16 firstseqnum = 0; // Clear old packets from start of buffer try{ for(;;){ firstseqnum = channel->incoming_reliables.getFirstSeqnum(); if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum)) channel->incoming_reliables.popFirst(); else break; } // This happens if all packets are old }catch(con::NotFoundException) {} if(channel->incoming_reliables.empty() == false) { if(firstseqnum == channel->next_incoming_seqnum) { BufferedPacket p = channel->incoming_reliables.popFirst(); peer_id = readPeerId(*p.data); u8 channelnum = readChannel(*p.data); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); PrintInfo(); dout_con<<"UNBUFFERING TYPE_RELIABLE" <<" seqnum="<outgoing_reliables.print(); dout_con< payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); return payload; } else if(type == TYPE_SPLIT) { // We have to create a packet again for buffering // This isn't actually too bad an idea. BufferedPacket packet = makePacket( getPeer(peer_id)->address, packetdata, GetProtocolID(), peer_id, channelnum); // Buffer the packet SharedBuffer data = channel->incoming_splits.insert(packet, reliable); if(data.getSize() != 0) { PrintInfo(); dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " <<"size="<next_incoming_seqnum); bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum); PrintInfo(); if(is_future_packet) dout_con<<"BUFFERING"; else if(is_old_packet) dout_con<<"OLD"; else dout_con<<"RECUR"; dout_con<<" TYPE_RELIABLE seqnum="<incoming_reliables.size() < 100); // Send a CONTROLTYPE_ACK SharedBuffer reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_ACK); writeU16(&reply[2], seqnum); rawSendAsPacket(peer_id, channelnum, reply, false); //if(seqnum_higher(seqnum, channel->next_incoming_seqnum)) if(is_future_packet) { /*PrintInfo(); dout_con<<"Buffering reliable packet (seqnum=" <address, packetdata, GetProtocolID(), peer_id, channelnum); try{ channel->incoming_reliables.insert(packet); /*PrintInfo(); dout_con<<"INCOMING: "; channel->incoming_reliables.print(); dout_con<next_incoming_seqnum, seqnum)) else if(is_old_packet) { // An old packet, dump it throw InvalidIncomingDataException("Got an old reliable packet"); } channel->next_incoming_seqnum++; // Get out the inside packet and re-process it SharedBuffer payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); return processPacket(channel, payload, peer_id, channelnum, true); } else { PrintInfo(derr_con); derr_con<<"Got invalid type="<<((int)type&0xff)<address); putEvent(e); delete m_peers[peer_id]; m_peers.remove(peer_id); return true; } /* Interface */ ConnectionEvent Connection::getEvent() { if(m_event_queue.size() == 0){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } return m_event_queue.pop_front(); } ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try{ return m_event_queue.pop_front(timeout_ms); } catch(ItemNotFoundException &ex){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } } void Connection::putCommand(ConnectionCommand &c) { m_command_queue.push_back(c); } void Connection::Serve(unsigned short port) { ConnectionCommand c; c.serve(port); putCommand(c); } void Connection::Connect(Address address) { ConnectionCommand c; c.connect(address); putCommand(c); } bool Connection::Connected() { JMutexAutoLock peerlock(m_peers_mutex); if(m_peers.size() != 1) return false; core::map::Node *node = m_peers.find(PEER_ID_SERVER); if(node == NULL) return false; if(m_peer_id == PEER_ID_INEXISTENT) return false; return true; } void Connection::Disconnect() { ConnectionCommand c; c.disconnect(); putCommand(c); } u32 Connection::Receive(u16 &peer_id, SharedBuffer &data) { for(;;){ ConnectionEvent e = waitEvent(m_bc_receive_timeout); if(e.type != CONNEVENT_NONE) dout_con<(e.data); return e.data.getSize(); case CONNEVENT_PEER_ADDED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->peerAdded(&tmp); continue; } case CONNEVENT_PEER_REMOVED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->deletingPeer(&tmp, e.timeout); continue; } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " "(port already in use?)"); } } throw NoIncomingDataException("No incoming data"); } void Connection::SendToAll(u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.sendToAll(channelnum, data, reliable); putCommand(c); } void Connection::Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.send(peer_id, channelnum, data, reliable); putCommand(c); } void Connection::RunTimeouts(float dtime) { // No-op } Address Connection::GetPeerAddress(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->address; } float Connection::GetPeerAvgRTT(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->avg_rtt; } void Connection::DeletePeer(u16 peer_id) { ConnectionCommand c; c.deletePeer(peer_id); putCommand(c); } void Connection::PrintInfo(std::ostream &out) { out<