diff --git a/cmd/mtwebmapper/forwardupdates.go b/cmd/mtwebmapper/forwardupdates.go index 4477214..470398c 100644 --- a/cmd/mtwebmapper/forwardupdates.go +++ b/cmd/mtwebmapper/forwardupdates.go @@ -4,23 +4,109 @@ package main -import "net/http" +import ( + "bytes" + "encoding/json" + "log" + "net/http" + + "github.com/gorilla/websocket" +) type websocketForwarder struct { + upgrader *websocket.Upgrader + register chan *connection + unregister chan *connection + broadcast chan map[xz]bool + connections map[*connection]bool +} + +type connection struct { + ws *websocket.Conn + send chan []byte } func newWebsocketForwarder() *websocketForwarder { - return &websocketForwarder{} + upgrader := &websocket.Upgrader{ReadBufferSize: 512, WriteBufferSize: 2048} + return &websocketForwarder{ + upgrader: upgrader, + register: make(chan *connection), + unregister: make(chan *connection), + broadcast: make(chan map[xz]bool), + connections: make(map[*connection]bool)} } func (wsf *websocketForwarder) run() { - // TODO: Implement me! + for { + select { + case c := <-wsf.register: + wsf.connections[c] = true + case c := <-wsf.unregister: + if _, ok := wsf.connections[c]; ok { + delete(wsf.connections, c) + close(c.send) + } + case changes := <-wsf.broadcast: + if len(wsf.connections) == 0 { + continue + } + // Do the JSON encoding this late to save + // some CPU cyles if no client is connected. + xzs := make([]xz, 0, len(changes)) + for xz := range changes { + xzs = append(xzs, xz) + } + var buf bytes.Buffer + encoder := json.NewEncoder(&buf) + if err := encoder.Encode(xzs); err != nil { + log.Printf("encoding changes failed: %s\n", err) + continue + } + msg := buf.Bytes() + for c := range wsf.connections { + select { + case c.send <- msg: + default: + delete(wsf.connections, c) + close(c.send) + } + } + } + } } func (wsf *websocketForwarder) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - // TODO: Implement me! + ws, err := wsf.upgrader.Upgrade(rw, r, nil) + if err != nil { + log.Printf("Cannot upgrade to websocket: %s\n", err) + return + } + c := &connection{ws: ws, send: make(chan []byte, 8)} + wsf.register <- c + defer func() { wsf.unregister <- c }() + go c.writer() + c.reader() } func (wsf *websocketForwarder) BaseTilesUpdated(changes map[xz]bool) { - // TODO: Implement me! + wsf.broadcast <- changes +} + +func (c *connection) writer() { + defer c.ws.Close() + for msg := range c.send { + if c.ws.WriteMessage(websocket.TextMessage, msg) != nil { + break + } + } +} + +func (c *connection) reader() { + defer c.ws.Close() + for { + // Just read the message and ignore it. + if _, _, err := c.ws.ReadMessage(); err != nil { + break + } + } }