/* Minetest-c55 Copyright (C) 2010 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "connection.h" #include "main.h" #include "serialization.h" namespace con { BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, u32 protocol_id, u16 sender_peer_id, u8 channel) { u32 packet_size = datasize + BASE_HEADER_SIZE; BufferedPacket p(packet_size); p.address = address; writeU32(&p.data[0], protocol_id); writeU16(&p.data[4], sender_peer_id); writeU8(&p.data[6], channel); memcpy(&p.data[BASE_HEADER_SIZE], data, datasize); return p; } BufferedPacket makePacket(Address &address, SharedBuffer &data, u32 protocol_id, u16 sender_peer_id, u8 channel) { return makePacket(address, *data, data.getSize(), protocol_id, sender_peer_id, channel); } SharedBuffer makeOriginalPacket( SharedBuffer data) { u32 header_size = 1; u32 packet_size = data.getSize() + header_size; SharedBuffer b(packet_size); writeU8(&b[0], TYPE_ORIGINAL); memcpy(&b[header_size], *data, data.getSize()); return b; } core::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum) { // Chunk packets, containing the TYPE_SPLIT header core::list > chunks; u32 chunk_header_size = 7; u32 maximum_data_size = chunksize_max - chunk_header_size; u32 start = 0; u32 end = 0; u32 chunk_num = 0; do{ end = start + maximum_data_size - 1; if(end > data.getSize() - 1) end = data.getSize() - 1; u32 payload_size = end - start + 1; u32 packet_size = chunk_header_size + payload_size; SharedBuffer chunk(packet_size); writeU8(&chunk[0], TYPE_SPLIT); writeU16(&chunk[1], seqnum); // [3] u16 chunk_count is written at next stage writeU16(&chunk[5], chunk_num); memcpy(&chunk[chunk_header_size], &data[start], payload_size); chunks.push_back(chunk); start = end + 1; chunk_num++; } while(end != data.getSize() - 1); u16 chunk_count = chunks.getSize(); core::list >::Iterator i = chunks.begin(); for(; i != chunks.end(); i++) { // Write chunk_count writeU16(&((*i)[3]), chunk_count); } return chunks; } core::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum) { u32 original_header_size = 1; core::list > list; if(data.getSize() + original_header_size > chunksize_max) { list = makeSplitPacket(data, chunksize_max, split_seqnum); split_seqnum++; return list; } else { list.push_back(makeOriginalPacket(data)); } return list; } SharedBuffer makeReliablePacket( SharedBuffer data, u16 seqnum) { /*dstream<<"BEGIN SharedBuffer makeReliablePacket()"< makeReliablePacket()"<::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); dout_con<::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /*dout_con<<"findPacket(): finding seqnum="<::Iterator i = m_list.begin(); m_list.erase(i); return p; } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { RPBSearchResult r = findPacket(seqnum); if(r == notFound()){ dout_con<<"Not found"<= BASE_HEADER_SIZE+3); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_RELIABLE); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); // Find the right place for the packet and insert it there // If list is empty, just add it if(m_list.empty()) { m_list.push_back(p); // Done. return; } // Otherwise find the right place core::list::Iterator i; i = m_list.begin(); // Find the first packet in the list which has a higher seqnum for(; i != m_list.end(); i++){ u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if(s == seqnum){ throw AlreadyExistsException("Same seqnum in list"); } if(seqnum_higher(s, seqnum)){ break; } } // If we're at the end of the list, add the packet to the // end of the list if(i == m_list.end()) { m_list.push_back(p); // Done. return; } // Insert before i m_list.insert_before(i, p); } void ReliablePacketBuffer::incrementTimeouts(float dtime) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ i->time += dtime; i->totaltime += dtime; } } void ReliablePacketBuffer::resetTimedOuts(float timeout) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ if(i->time >= timeout) i->time = 0.0; } } bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) { core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++){ if(i->totaltime >= timeout) return true; } return false; } core::list ReliablePacketBuffer::getTimedOuts(float timeout) { core::list timed_outs; core::list::Iterator i; i = m_list.begin(); for(; i != m_list.end(); i++) { if(i->time >= timeout) timed_outs.push_back(*i); } return timed_outs; } /* IncomingSplitBuffer */ IncomingSplitBuffer::~IncomingSplitBuffer() { core::map::Iterator i; i = m_buf.getIterator(); for(; i.atEnd() == false; i++) { delete i.getNode()->getValue(); } } /* This will throw a GotSplitPacketException when a full split packet is constructed. */ void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { u32 headersize = BASE_HEADER_SIZE + 7; assert(p.data.getSize() >= headersize); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_SPLIT); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); // Add if doesn't exist if(m_buf.find(seqnum) == NULL) { IncomingSplitPacket *sp = new IncomingSplitPacket(); sp->chunk_count = chunk_count; sp->reliable = reliable; m_buf[seqnum] = sp; } IncomingSplitPacket *sp = m_buf[seqnum]; // TODO: These errors should be thrown or something? Dunno. if(chunk_count != sp->chunk_count) derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count <reliable) derr_con<<"Connection: WARNING: reliable="<reliable="<reliable <chunks.find(chunk_num) != NULL) throw AlreadyExistsException("Chunk already in buffer"); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; SharedBuffer chunkdata(chunkdatasize); memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); // Set chunk data in buffer sp->chunks[chunk_num] = chunkdata; // If not all chunks are received, return if(sp->allReceived() == false) return; // Calculate total size u32 totalsize = 0; core::map >::Iterator i; i = sp->chunks.getIterator(); for(; i.atEnd() == false; i++) { totalsize += i.getNode()->getValue().getSize(); } SharedBuffer fulldata(totalsize); // Copy chunks to data buffer u32 start = 0; for(u32 chunk_i=0; chunk_ichunk_count; chunk_i++) { SharedBuffer buf = sp->chunks[chunk_i]; u16 chunkdatasize = buf.getSize(); memcpy(&fulldata[start], *buf, chunkdatasize); start += chunkdatasize;; } // Remove sp from buffer m_buf.remove(seqnum); delete sp; throw GotSplitPacketException(fulldata); } void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { core::list remove_queue; core::map::Iterator i; i = m_buf.getIterator(); for(; i.atEnd() == false; i++) { IncomingSplitPacket *p = i.getNode()->getValue(); // Reliable ones are not removed by timeout if(p->reliable == true) continue; p->time += dtime; if(p->time >= timeout) remove_queue.push_back(i.getNode()->getKey()); } core::list::Iterator j; j = remove_queue.begin(); for(; j != remove_queue.end(); j++) { dout_con<<"NOTE: Removing timed out unreliable split packet" < RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; resend_timeout = timeout; } /* Connection */ Connection::Connection( u32 protocol_id, u32 max_packet_size, float timeout, PeerHandler *peerhandler ) { assert(peerhandler != NULL); m_protocol_id = protocol_id; m_max_packet_size = max_packet_size; m_timeout = timeout; m_peer_id = PEER_ID_NEW; //m_waiting_new_peer_id = false; m_indentation = 0; m_peerhandler = peerhandler; } Connection::~Connection() { // Clear peers core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); delete peer; } } void Connection::Serve(unsigned short port) { m_socket.Bind(port); m_peer_id = PEER_ID_SERVER; } void Connection::Connect(Address address) { core::map::Node *node = m_peers.find(PEER_ID_SERVER); if(node != NULL){ throw ConnectionException("Already connected to a server"); } Peer *peer = new Peer(PEER_ID_SERVER, address); m_peers.insert(peer->id, peer); m_peerhandler->peerAdded(peer); m_socket.Bind(0); // Send a dummy packet to server with peer_id = PEER_ID_NEW m_peer_id = PEER_ID_NEW; SharedBuffer data(0); Send(PEER_ID_SERVER, 0, data, true); //m_waiting_new_peer_id = true; } bool Connection::Connected() { if(m_peers.size() != 1) return false; core::map::Node *node = m_peers.find(PEER_ID_SERVER); if(node == NULL) return false; if(m_peer_id == PEER_ID_NEW) return false; return true; } SharedBuffer Channel::ProcessPacket( SharedBuffer packetdata, Connection *con, u16 peer_id, u8 channelnum, bool reliable) { IndentationRaiser iraiser(&(con->m_indentation)); if(packetdata.getSize() < 1) throw InvalidIncomingDataException("packetdata.getSize() < 1"); u8 type = readU8(&packetdata[0]); if(type == TYPE_CONTROL) { if(packetdata.getSize() < 2) throw InvalidIncomingDataException("packetdata.getSize() < 2"); u8 controltype = readU8(&packetdata[1]); if(controltype == CONTROLTYPE_ACK) { if(packetdata.getSize() < 4) throw InvalidIncomingDataException ("packetdata.getSize() < 4 (ACK header size)"); u16 seqnum = readU16(&packetdata[2]); con->PrintInfo(); dout_con<<"Got CONTROLTYPE_ACK: channelnum=" <<((int)channelnum&0xff)<<", peer_id="<PrintInfo(); outgoing_reliables.print(); dout_con<PrintInfo(derr_con); derr_con<<"WARNING: ACKed packet not " "in outgoing queue" <PrintInfo(); dout_con<<"Got new peer id: "<GetPeerID() != PEER_ID_NEW) { con->PrintInfo(derr_con); derr_con<<"WARNING: Not changing" " existing peer id."<SetPeerID(peer_id_new); } throw ProcessedSilentlyException("Got a SET_PEER_ID"); } else if(controltype == CONTROLTYPE_PING) { // Just ignore it, the incoming data already reset // the timeout counter con->PrintInfo(); dout_con<<"PING"<PrintInfo(derr_con); derr_con<<"INVALID TYPE_CONTROL: invalid controltype=" <<((int)controltype&0xff)<PrintInfo(); dout_con<<"RETURNING TYPE_ORIGINAL to user" < payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); return payload; } else if(type == TYPE_SPLIT) { // We have to create a packet again for buffering // This isn't actually too bad an idea. BufferedPacket packet = makePacket( con->GetPeer(peer_id)->address, packetdata, con->GetProtocolID(), peer_id, channelnum); try{ // Buffer the packet incoming_splits.insert(packet, reliable); } // This exception happens when all the pieces of a packet // are collected. catch(GotSplitPacketException &e) { con->PrintInfo(); dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " <<"size="<PrintInfo(); dout_con<<"BUFFERING TYPE_SPLIT"<PrintInfo(); if(is_future_packet) dout_con<<"BUFFERING"; else if(is_old_packet) dout_con<<"OLD"; else dout_con<<"RECUR"; dout_con<<" TYPE_RELIABLE seqnum="< reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_ACK); writeU16(&reply[2], seqnum); con->SendAsPacket(peer_id, channelnum, reply, false); //if(seqnum_higher(seqnum, next_incoming_seqnum)) if(is_future_packet) { /*con->PrintInfo(); dout_con<<"Buffering reliable packet (seqnum=" <GetPeer(peer_id)->address, packetdata, con->GetProtocolID(), peer_id, channelnum); try{ incoming_reliables.insert(packet); /*con->PrintInfo(); dout_con<<"INCOMING: "; incoming_reliables.print(); dout_con< payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); return ProcessPacket(payload, con, peer_id, channelnum, true); } else { con->PrintInfo(derr_con); derr_con<<"Got invalid type="<<((int)type&0xff)< Channel::CheckIncomingBuffers(Connection *con, u16 &peer_id) { u16 firstseqnum = 0; // Clear old packets from start of buffer try{ for(;;){ firstseqnum = incoming_reliables.getFirstSeqnum(); if(seqnum_higher(next_incoming_seqnum, firstseqnum)) incoming_reliables.popFirst(); else break; } // This happens if all packets are old }catch(con::NotFoundException) {} if(incoming_reliables.empty() == false) { if(firstseqnum == next_incoming_seqnum) { BufferedPacket p = incoming_reliables.popFirst(); peer_id = readPeerId(*p.data); u8 channelnum = readChannel(*p.data); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); con->PrintInfo(); dout_con<<"UNBUFFERING TYPE_RELIABLE" <<" seqnum="< Connection::GetFromBuffers(u16 &peer_id) { core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); for(u16 i=0; ichannels[i]; try{ SharedBuffer resultdata = channel->CheckIncomingBuffers (this, peer_id); return resultdata; } catch(NoIncomingDataException &e) { } catch(InvalidIncomingDataException &e) { } catch(ProcessedSilentlyException &e) { } } } throw NoIncomingDataException("No relevant data in buffers"); } u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) { /* Receive a packet from the network */ // TODO: We can not know how many layers of header there are. // For now, just assume there are no other than the base headers. u32 packet_maxsize = datasize + BASE_HEADER_SIZE; Buffer packetdata(packet_maxsize); for(;;) { try { /* Check if some buffer has relevant data */ try{ SharedBuffer resultdata = GetFromBuffers(peer_id); if(datasize < resultdata.getSize()) throw InvalidIncomingDataException ("Buffer too small for received data"); memcpy(data, *resultdata, resultdata.getSize()); return resultdata.getSize(); } catch(NoIncomingDataException &e) { } Address sender; s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); if(received_size < 0) throw NoIncomingDataException("No incoming data"); if(received_size < BASE_HEADER_SIZE) throw InvalidIncomingDataException("No full header received"); if(readU32(&packetdata[0]) != m_protocol_id) throw InvalidIncomingDataException("Invalid protocol id"); peer_id = readPeerId(*packetdata); u8 channelnum = readChannel(*packetdata); if(channelnum > CHANNEL_COUNT-1){ PrintInfo(derr_con); derr_con<<"Receive(): Invalid channel "<::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); if(peer->has_sent_with_id) continue; if(peer->address == sender) break; } /* If no peer was found with the same address and port, we shall assume it is a new peer and create an entry. */ if(j.atEnd()) { // Pass on to adding the peer } // Else: A peer was found. else { Peer *peer = j.getNode()->getValue(); peer_id = peer->id; PrintInfo(derr_con); derr_con<<"WARNING: Assuming unknown peer to be " <<"peer_id="<getValue(); // Validate peer address if(peer->address != sender) { PrintInfo(derr_con); derr_con<<"Peer "<timeout_counter = 0.0; Channel *channel = &(peer->channels[channelnum]); // Throw the received packet to channel->processPacket() // Make a new SharedBuffer from the data without the base headers SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], strippeddata.getSize()); try{ // Process it (the result is some data with no headers made by us) SharedBuffer resultdata = channel->ProcessPacket (strippeddata, this, peer_id, channelnum); PrintInfo(); dout_con<<"ProcessPacket returned data of size " < data, bool reliable) { core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); Send(peer->id, channelnum, data, reliable); } } void Connection::Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); Peer *peer = GetPeer(peer_id); Channel *channel = &(peer->channels[channelnum]); u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE; if(reliable) chunksize_max -= RELIABLE_HEADER_SIZE; core::list > originals; originals = makeAutoSplitPacket(data, chunksize_max, channel->next_outgoing_split_seqnum); core::list >::Iterator i; i = originals.begin(); for(; i != originals.end(); i++) { SharedBuffer original = *i; SendAsPacket(peer_id, channelnum, original, reliable); } } void Connection::SendAsPacket(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { Peer *peer = GetPeer(peer_id); Channel *channel = &(peer->channels[channelnum]); if(reliable) { u16 seqnum = channel->next_outgoing_seqnum; channel->next_outgoing_seqnum++; SharedBuffer reliable = makeReliablePacket(data, seqnum); // Add base headers and make a packet BufferedPacket p = makePacket(peer->address, reliable, m_protocol_id, m_peer_id, channelnum); try{ // Buffer the packet channel->outgoing_reliables.insert(p); } catch(AlreadyExistsException &e) { PrintInfo(derr_con); derr_con<<"WARNING: Going to send a reliable packet " "seqnum="<address, data, m_protocol_id, m_peer_id, channelnum); // Send the packet RawSend(p); } } void Connection::RawSend(const BufferedPacket &packet) { m_socket.Send(packet.address, *packet.data, packet.data.getSize()); } void Connection::RunTimeouts(float dtime) { core::list timeouted_peers; core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); /* Check peer timeout */ peer->timeout_counter += dtime; if(peer->timeout_counter > m_timeout) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=peer->timeout_counter)" <id); // Don't bother going through the buffers of this one continue; } float resend_timeout = peer->resend_timeout; for(u16 i=0; i timed_outs; core::list::Iterator j; Channel *channel = &peer->channels[i]; // Remove timed out incomplete unreliable split packets channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); // Increment reliable packet times channel->outgoing_reliables.incrementTimeouts(dtime); // Check reliable packet total times, remove peer if // over timeout. if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=reliable packet totaltime)" <id); goto nextpeer; } // Re-send timed out outgoing reliables timed_outs = channel-> outgoing_reliables.getTimedOuts(resend_timeout); channel->outgoing_reliables.resetTimedOuts(resend_timeout); j = timed_outs.begin(); for(; j != timed_outs.end(); j++) { u16 peer_id = readPeerId(*(j->data)); u8 channel = readChannel(*(j->data)); u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); PrintInfo(derr_con); derr_con<<"RE-SENDING timed-out RELIABLE to "; j->address.print(&derr_con); derr_con<<"(t/o="<deletingPeer(m_peers[*i], true); delete m_peers[*i]; m_peers.remove(*i); } } Peer* Connection::GetPeer(u16 peer_id) { core::map::Node *node = m_peers.find(peer_id); if(node == NULL){ // Peer not found throw PeerNotFoundException("Peer not found (possible timeout)"); } // Error checking assert(node->getValue()->id == peer_id); return node->getValue(); } Peer* Connection::GetPeerNoEx(u16 peer_id) { core::map::Node *node = m_peers.find(peer_id); if(node == NULL){ return NULL; } // Error checking assert(node->getValue()->id == peer_id); return node->getValue(); } core::list Connection::GetPeers() { core::list list; core::map::Iterator j; j = m_peers.getIterator(); for(; j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); list.push_back(peer); } return list; } void Connection::PrintInfo(std::ostream &out) { out<