mtsatellite/cmd/mtwebmapper/forwardupdates.go

113 lines
2.5 KiB
Go

// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bytes"
"encoding/json"
"log"
"net/http"
"github.com/gorilla/websocket"
)
type websocketForwarder struct {
upgrader *websocket.Upgrader
register chan *connection
unregister chan *connection
broadcast chan map[xz]bool
connections map[*connection]bool
}
type connection struct {
ws *websocket.Conn
send chan []byte
}
func newWebsocketForwarder() *websocketForwarder {
upgrader := &websocket.Upgrader{ReadBufferSize: 512, WriteBufferSize: 2048}
return &websocketForwarder{
upgrader: upgrader,
register: make(chan *connection),
unregister: make(chan *connection),
broadcast: make(chan map[xz]bool),
connections: make(map[*connection]bool)}
}
func (wsf *websocketForwarder) run() {
for {
select {
case c := <-wsf.register:
wsf.connections[c] = true
case c := <-wsf.unregister:
if _, ok := wsf.connections[c]; ok {
delete(wsf.connections, c)
close(c.send)
}
case changes := <-wsf.broadcast:
if len(wsf.connections) == 0 {
continue
}
// Do the JSON encoding this late to save
// some CPU cyles if no client is connected.
xzs := make([]xz, 0, len(changes))
for xz := range changes {
xzs = append(xzs, xz)
}
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
if err := encoder.Encode(xzs); err != nil {
log.Printf("encoding changes failed: %s\n", err)
continue
}
msg := buf.Bytes()
for c := range wsf.connections {
select {
case c.send <- msg:
default:
delete(wsf.connections, c)
close(c.send)
}
}
}
}
}
func (wsf *websocketForwarder) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
ws, err := wsf.upgrader.Upgrade(rw, r, nil)
if err != nil {
log.Printf("Cannot upgrade to websocket: %s\n", err)
return
}
c := &connection{ws: ws, send: make(chan []byte, 8)}
wsf.register <- c
defer func() { wsf.unregister <- c }()
go c.writer()
c.reader()
}
func (wsf *websocketForwarder) BaseTilesUpdated(changes map[xz]bool) {
wsf.broadcast <- changes
}
func (c *connection) writer() {
defer c.ws.Close()
for msg := range c.send {
if c.ws.WriteMessage(websocket.TextMessage, msg) != nil {
break
}
}
}
func (c *connection) reader() {
defer c.ws.Close()
for {
// Just read the message and ignore it.
if _, _, err := c.ws.NextReader(); err != nil {
break
}
}
}