Broadcast player posistion to websocket clients this could be high traffic.

This commit is contained in:
Sascha L. Teichmann 2015-03-09 15:37:57 +01:00
parent 99d4f700be
commit c918b538db
3 changed files with 40 additions and 15 deletions

View File

@ -17,7 +17,7 @@ type websocketForwarder struct {
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
register chan *connection register chan *connection
unregister chan *connection unregister chan *connection
broadcast chan map[xz]bool broadcast chan msg
connections map[*connection]bool connections map[*connection]bool
} }
@ -26,13 +26,18 @@ type connection struct {
send chan []byte send chan []byte
} }
type msg struct {
tiles map[xz]bool
pls []*player
}
func newWebsocketForwarder() *websocketForwarder { func newWebsocketForwarder() *websocketForwarder {
upgrader := &websocket.Upgrader{ReadBufferSize: 512, WriteBufferSize: 2048} upgrader := &websocket.Upgrader{ReadBufferSize: 512, WriteBufferSize: 2048}
return &websocketForwarder{ return &websocketForwarder{
upgrader: upgrader, upgrader: upgrader,
register: make(chan *connection), register: make(chan *connection),
unregister: make(chan *connection), unregister: make(chan *connection),
broadcast: make(chan map[xz]bool), broadcast: make(chan msg),
connections: make(map[*connection]bool)} connections: make(map[*connection]bool)}
} }
@ -46,26 +51,34 @@ func (wsf *websocketForwarder) run() {
delete(wsf.connections, c) delete(wsf.connections, c)
close(c.send) close(c.send)
} }
case changes := <-wsf.broadcast: case message := <-wsf.broadcast:
if len(wsf.connections) == 0 { if len(wsf.connections) == 0 {
continue continue
} }
// Do the JSON encoding this late to save encMsg := map[string]interface{}{}
// some CPU cyles if no client is connected.
xzs := make([]xz, 0, len(changes)) if tiles := message.tiles; tiles != nil {
for xz := range changes { xzs := make([]xz, 0, len(tiles))
xzs = append(xzs, xz) for xz := range tiles {
xzs = append(xzs, xz)
}
encMsg["tiles"] = xzs
} }
if message.pls != nil {
encMsg["players"] = message.pls
}
var buf bytes.Buffer var buf bytes.Buffer
encoder := json.NewEncoder(&buf) encoder := json.NewEncoder(&buf)
if err := encoder.Encode(xzs); err != nil { if err := encoder.Encode(encMsg); err != nil {
log.Printf("encoding changes failed: %s\n", err) log.Printf("encoding changes failed: %s\n", err)
continue continue
} }
msg := buf.Bytes() m := buf.Bytes()
for c := range wsf.connections { for c := range wsf.connections {
select { select {
case c.send <- msg: case c.send <- m:
default: default:
delete(wsf.connections, c) delete(wsf.connections, c)
close(c.send) close(c.send)
@ -89,7 +102,11 @@ func (wsf *websocketForwarder) ServeHTTP(rw http.ResponseWriter, r *http.Request
} }
func (wsf *websocketForwarder) BaseTilesUpdated(changes map[xz]bool) { func (wsf *websocketForwarder) BaseTilesUpdated(changes map[xz]bool) {
wsf.broadcast <- changes wsf.broadcast <- msg{tiles: changes}
}
func (wsf *websocketForwarder) BroadcastPlayers(pls []*player) {
wsf.broadcast <- msg{pls: pls}
} }
func (c *connection) writer() { func (c *connection) writer() {

View File

@ -77,7 +77,10 @@ func (ps *players) run() {
ps.mu.Lock() ps.mu.Lock()
ps.pls = pls ps.pls = pls
ps.mu.Unlock() ps.mu.Unlock()
// TODO: Implement websocket broadcast. if ps.wsf != nil {
// TODO: Throttle this!
ps.wsf.BroadcastPlayers(pls)
}
} }
} }

View File

@ -47,13 +47,18 @@ L.Control.AutoUpdate = L.Control.extend({
if (!(typeof json === "string")) { if (!(typeof json === "string")) {
return invalidateAll; return invalidateAll;
} }
var tiles; var msg;
try { try {
tiles = JSON.parse(json); msg = JSON.parse(json);
} catch (err) { } catch (err) {
return invalidateAll; return invalidateAll;
} }
var tiles = msg.tiles;
if !tiles {
continue;
}
var pyramid = new Array(9); var pyramid = new Array(9);
var last = new Object(); var last = new Object();
pyramid[8] = last; pyramid[8] = last;