Merge branch 'master' into poi-mod

This commit is contained in:
Raimund Renkert
2022-03-07 10:54:17 +01:00
16 changed files with 831 additions and 248 deletions

91
cmd/mtwebmapper/config.go Normal file
View File

@ -0,0 +1,91 @@
// Copyright 2014, 2015, 2022 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"flag"
"time"
"github.com/BurntSushi/toml"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
)
type duration struct {
time.Duration
}
type config struct {
WebPort int `toml:"web_port"`
WebHost string `toml:"web_host"`
WebDir string `toml:"web"`
MapDir string `toml:"map"`
RedisPort int `toml:"redis_port"`
RedisHost string `toml:"redis_host"`
ColorsFile string `toml:"colors"`
BGColor string `toml:"background"`
Workers int `toml:"workers"`
Transparent bool `toml:"transparent"`
TransparentDim float64 `toml:"transparent_dim"`
UpdateHosts string `toml:"update_hosts"`
Websockets bool `toml:"websockets"`
PlayersFIFO string `toml:"players"`
YMin int `toml:"ymin"`
YMax int `toml:"ymax"`
ChangeDuration duration `toml:"change_duration"`
}
func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}
func (cfg *config) bindFlags() {
defaultBgColor := common.ColorToHex(common.BackgroundColor)
flag.IntVar(&cfg.WebPort, "web-port", 8808, "port of the web server")
flag.IntVar(&cfg.WebPort, "p", 8808, "port of the web server (shorthand)")
flag.StringVar(&cfg.WebHost, "web-host", "localhost", "address to bind web server")
flag.StringVar(&cfg.WebHost, "h", "localhost", "address to bind web server(shorthand)")
flag.StringVar(&cfg.WebDir, "web", "web", "static served web files.")
flag.StringVar(&cfg.WebDir, "w", "web", "static served web files (shorthand)")
flag.StringVar(&cfg.MapDir, "map", "map", "directory of prerendered tiles")
flag.StringVar(&cfg.MapDir, "m", "map", "directory of prerendered tiles (shorthand)")
flag.StringVar(&cfg.UpdateHosts, "update-hosts", "localhost",
"';' separated list of hosts which are allowed to send map update requests")
flag.StringVar(&cfg.UpdateHosts, "u", "localhost",
"';' separated list of hosts which are allowed to send map update requests (shorthand)")
flag.StringVar(&cfg.RedisHost, "redis-host", "", "address of the backend Redis server")
flag.StringVar(&cfg.RedisHost, "rh", "", "address of the backend Redis server (shorthand)")
flag.IntVar(&cfg.RedisPort, "redis-port", 6379, "port of the backend Redis server")
flag.IntVar(&cfg.RedisPort, "rp", 6379, "port of the backend Redis server (shorthand)")
flag.IntVar(&cfg.Workers, "workers", 1, "number of workers to render tiles")
flag.StringVar(&cfg.ColorsFile, "colors", "colors.txt", "colors used to render map tiles.")
flag.StringVar(&cfg.ColorsFile, "c", "colors.txt", "colors used to render map tiles (shorthand).")
flag.StringVar(&cfg.BGColor, "background", defaultBgColor, "background color")
flag.StringVar(&cfg.BGColor, "bg", defaultBgColor, "background color (shorthand)")
flag.BoolVar(&cfg.Transparent, "transparent", false, "Render transparent blocks.")
flag.BoolVar(&cfg.Transparent, "t", false, "Render transparent blocks (shorthand).")
flag.Float64Var(&cfg.TransparentDim,
"transparent-dim", common.DefaultTransparentDim*100.0,
"Extra dimming of transparent nodes each depth meter in percent.")
flag.Float64Var(&cfg.TransparentDim,
"td", common.DefaultTransparentDim*100.0,
"Extra dimming of transparent nodes each depth meter in percent. (shorthand)")
flag.BoolVar(&cfg.Websockets, "websockets", false, "Forward tile changes to clients via websockets.")
flag.BoolVar(&cfg.Websockets, "ws", false, "Forward tile changes to clients via websockets (shorthand).")
flag.StringVar(&cfg.PlayersFIFO, "players", "", "Path to FIFO file to read active players from.")
flag.StringVar(&cfg.PlayersFIFO, "ps", "", "Path to FIFO file to read active players from (shorthand).")
flag.IntVar(&cfg.YMin, "ymin", common.MinHeight, "Minimum y in blocks.")
flag.IntVar(&cfg.YMax, "ymax", common.MaxHeight, "Maximum y in blocks.")
flag.DurationVar(&cfg.ChangeDuration.Duration,
"change-duration", time.Second, "Duration to aggregate changes. (PG only)")
}
func (cfg *config) load(fname string) error {
_, err := toml.DecodeFile(fname, &cfg)
return err
}

View File

@ -5,7 +5,6 @@
package main
import (
"bytes"
"encoding/json"
"log"
"net/http"
@ -15,10 +14,9 @@ import (
type websocketForwarder struct {
upgrader *websocket.Upgrader
register chan *connection
unregister chan *connection
broadcast chan msg
connections map[*connection]bool
connections map[*connection]struct{}
funcs chan func(*websocketForwarder)
init func(*websocketForwarder, *connection)
}
type connection struct {
@ -26,64 +24,106 @@ type connection struct {
send chan []byte
}
type msg struct {
tiles []xz
pls []*player
}
type (
tilesMsg struct {
Tiles []xz `json:"tiles"`
}
plsMsg struct {
Pls []*player `json:"players"`
}
)
func newWebsocketForwarder() *websocketForwarder {
upgrader := &websocket.Upgrader{ReadBufferSize: 512, WriteBufferSize: 2048}
upgrader := &websocket.Upgrader{
ReadBufferSize: 512,
WriteBufferSize: 2048,
//CheckOrigin: func(*http.Request) bool { return true },
}
return &websocketForwarder{
upgrader: upgrader,
register: make(chan *connection),
unregister: make(chan *connection),
broadcast: make(chan msg),
connections: make(map[*connection]bool)}
connections: make(map[*connection]struct{}),
funcs: make(chan func(*websocketForwarder)),
}
}
func (wsf *websocketForwarder) run() {
for {
select {
case c := <-wsf.register:
wsf.connections[c] = true
case c := <-wsf.unregister:
if _, ok := wsf.connections[c]; ok {
for fn := range wsf.funcs {
fn(wsf)
}
}
func (wsf *websocketForwarder) register(c *connection) {
wsf.funcs <- func(wsf *websocketForwarder) {
wsf.connections[c] = struct{}{}
}
}
func (wsf *websocketForwarder) unregister(c *connection) {
wsf.funcs <- func(wsf *websocketForwarder) {
if _, ok := wsf.connections[c]; ok {
delete(wsf.connections, c)
close(c.send)
}
}
}
func (wsf *websocketForwarder) setInit(init func(*websocketForwarder, *connection)) {
wsf.funcs <- func(wsf *websocketForwarder) {
wsf.init = init
}
}
func (wsf *websocketForwarder) send(m interface{}) {
wsf.funcs <- func(wsf *websocketForwarder) {
if len(wsf.connections) == 0 {
return
}
data, err := json.Marshal(m)
if err != nil {
log.Printf("encoding failed. %v\n", err)
return
}
for c := range wsf.connections {
select {
case c.send <- data:
default:
delete(wsf.connections, c)
close(c.send)
}
case message := <-wsf.broadcast:
if len(wsf.connections) == 0 {
continue
}
encMsg := map[string]interface{}{}
if message.tiles != nil {
encMsg["tiles"] = message.tiles
}
if message.pls != nil {
encMsg["players"] = message.pls
}
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
if err := encoder.Encode(encMsg); err != nil {
log.Printf("encoding changes failed: %s\n", err)
continue
}
m := buf.Bytes()
for c := range wsf.connections {
select {
case c.send <- m:
default:
delete(wsf.connections, c)
close(c.send)
}
}
}
}
}
func (wsf *websocketForwarder) singleSend(c *connection, m interface{}) {
wsf.funcs <- func(wsf *websocketForwarder) {
_, ok := wsf.connections[c]
if !ok {
return
}
data, err := json.Marshal(m)
if err != nil {
log.Printf("encoding failed. %v\n", err)
return
}
select {
case c.send <- data:
default:
delete(wsf.connections, c)
close(c.send)
}
}
}
func (wsf *websocketForwarder) BaseTilesUpdated(changes []xz) {
wsf.send(&tilesMsg{Tiles: changes})
}
func (wsf *websocketForwarder) BroadcastPlayers(pls []*player) {
wsf.send(&plsMsg{Pls: pls})
}
func (wsf *websocketForwarder) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
ws, err := wsf.upgrader.Upgrade(rw, r, nil)
if err != nil {
@ -91,20 +131,15 @@ func (wsf *websocketForwarder) ServeHTTP(rw http.ResponseWriter, r *http.Request
return
}
c := &connection{ws: ws, send: make(chan []byte, 8)}
wsf.register <- c
defer func() { wsf.unregister <- c }()
wsf.register(c)
defer wsf.unregister(c)
go c.writer()
if wsf.init != nil {
wsf.init(wsf, c)
}
c.reader()
}
func (wsf *websocketForwarder) BaseTilesUpdated(changes []xz) {
wsf.broadcast <- msg{tiles: changes}
}
func (wsf *websocketForwarder) BroadcastPlayers(pls []*player) {
wsf.broadcast <- msg{pls: pls}
}
func (c *connection) writer() {
defer c.ws.Close()
for msg := range c.send {

View File

@ -10,7 +10,6 @@ import (
"log"
"net"
"net/http"
"strings"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
@ -18,64 +17,16 @@ import (
)
func main() {
var (
webPort int
webHost string
webDir string
mapDir string
redisPort int
redisHost string
colorsFile string
bgColor string
workers int
transparent bool
transparentDim float64
updateHosts string
websockets bool
playersFIFO string
version bool
yMin int
yMax int
cfg config
cfgFile string
version bool
)
defaultBgColor := common.ColorToHex(common.BackgroundColor)
flag.IntVar(&webPort, "web-port", 8808, "port of the web server")
flag.IntVar(&webPort, "p", 8808, "port of the web server (shorthand)")
flag.StringVar(&webHost, "web-host", "localhost", "address to bind web server")
flag.StringVar(&webHost, "h", "localhost", "address to bind web server(shorthand)")
flag.StringVar(&webDir, "web", "web", "static served web files.")
flag.StringVar(&webDir, "w", "web", "static served web files (shorthand)")
flag.StringVar(&mapDir, "map", "map", "directory of prerendered tiles")
flag.StringVar(&mapDir, "m", "map", "directory of prerendered tiles (shorthand)")
flag.StringVar(&updateHosts, "update-hosts", "localhost",
"';' separated list of hosts which are allowed to send map update requests")
flag.StringVar(&updateHosts, "u", "localhost",
"';' separated list of hosts which are allowed to send map update requests (shorthand)")
flag.StringVar(&redisHost, "redis-host", "", "address of the backend Redis server")
flag.StringVar(&redisHost, "rh", "", "address of the backend Redis server (shorthand)")
flag.IntVar(&redisPort, "redis-port", 6379, "port of the backend Redis server")
flag.IntVar(&redisPort, "rp", 6379, "port of the backend Redis server (shorthand)")
flag.IntVar(&workers, "workers", 1, "number of workers to render tiles")
flag.StringVar(&colorsFile, "colors", "colors.txt", "colors used to render map tiles.")
flag.StringVar(&colorsFile, "c", "colors.txt", "colors used to render map tiles (shorthand).")
flag.StringVar(&bgColor, "background", defaultBgColor, "background color")
flag.StringVar(&bgColor, "bg", defaultBgColor, "background color (shorthand)")
flag.BoolVar(&transparent, "transparent", false, "Render transparent blocks.")
flag.BoolVar(&transparent, "t", false, "Render transparent blocks (shorthand).")
flag.Float64Var(&transparentDim,
"transparent-dim", common.DefaultTransparentDim*100.0,
"Extra dimming of transparent nodes each depth meter in percent.")
flag.Float64Var(&transparentDim,
"td", common.DefaultTransparentDim*100.0,
"Extra fimming of transparent nodes each depth meter in percent. (shorthand)")
flag.BoolVar(&websockets, "websockets", false, "Forward tile changes to clients via websockets.")
flag.BoolVar(&websockets, "ws", false, "Forward tile changes to clients via websockets (shorthand).")
flag.StringVar(&playersFIFO, "players", "", "Path to FIFO file to read active players from.")
flag.StringVar(&playersFIFO, "ps", "", "Path to FIFO file to read active players from (shorthand).")
flag.IntVar(&yMin, "ymin", common.MinHeight, "Minimum y in blocks.")
flag.IntVar(&yMax, "ymax", common.MaxHeight, "Maximum y in blocks.")
flag.StringVar(&cfgFile, "config", "", "configuration file")
flag.BoolVar(&version, "version", false, "Print version and exit.")
cfg.bindFlags()
flag.Parse()
@ -83,66 +34,80 @@ func main() {
common.PrintVersionAndExit()
}
bg := common.ParseColorDefault(bgColor, common.BackgroundColor)
if cfgFile != "" {
if err := cfg.load(cfgFile); err != nil {
log.Fatalf("error: %v\n", err)
}
}
bg := common.ParseColorDefault(cfg.BGColor, common.BackgroundColor)
router := mux.NewRouter()
subBaseLine := newSubBaseLine(mapDir, bg)
subBaseLine := newSubBaseLine(cfg.MapDir, bg)
router.Path("/map/{z:[0-9]+}/{x:[0-9]+}/{y:[0-9]+}.png").Handler(subBaseLine)
var btu baseTilesUpdates
var wsf *websocketForwarder
if websockets {
if cfg.Websockets {
wsf = newWebsocketForwarder()
go wsf.run()
router.Path("/ws").Methods("GET").Handler(wsf)
btu = wsf
}
if playersFIFO != "" {
plys := newPlayers(playersFIFO, wsf)
if cfg.PlayersFIFO != "" {
plys := newPlayers(cfg.PlayersFIFO, wsf)
wsf.setInit(plys.initConnection)
go plys.run()
router.Path("/players").Methods("GET").Handler(plys)
}
if redisHost != "" {
if cfg.RedisHost != "" {
var colors *common.Colors
var err error
if colors, err = common.ParseColors(colorsFile); err != nil {
if colors, err = common.ParseColors(cfg.ColorsFile); err != nil {
log.Fatalf("ERROR: problem loading colors: %s", err)
}
colors.TransparentDim = common.Clamp32f(
float32(transparentDim/100.0), 0.0, 100.0)
var redisAddress string
if strings.ContainsRune(redisHost, '/') {
redisAddress = redisHost
} else {
redisAddress = fmt.Sprintf("%s:%d", redisHost, redisPort)
float32(cfg.TransparentDim/100.0), 0.0, 100.0)
dbcf, err := common.CreateDBClientFactory(cfg.RedisHost, cfg.RedisPort)
if err != nil {
log.Fatalf("error: %v\n", err)
}
defer dbcf.Close()
var allowedUpdateIps []net.IP
if allowedUpdateIps, err = ipsFromHosts(updateHosts); err != nil {
if allowedUpdateIps, err = ipsFromHosts(cfg.UpdateHosts); err != nil {
log.Fatalf("ERROR: name resolving problem: %s", err)
}
tu := newTileUpdater(
mapDir,
redisAddress,
cfg.MapDir,
dbcf,
allowedUpdateIps,
colors, bg,
yMin, yMax,
transparent,
workers,
cfg.YMin, cfg.YMax,
cfg.Transparent,
cfg.Workers,
btu)
go tu.doUpdates()
router.Path("/update").Methods("POST").Handler(tu)
if pgHost, ok := common.IsPostgreSQL(cfg.RedisHost); btu != nil && ok {
go tu.listen(pgHost, cfg.ChangeDuration.Duration)
} else {
router.Path("/update").Methods("POST").Handler(tu)
}
}
router.PathPrefix("/").Handler(http.FileServer(http.Dir(webDir)))
router.PathPrefix("/").Handler(http.FileServer(http.Dir(cfg.WebDir)))
http.Handle("/", router)
addr := fmt.Sprintf("%s:%d", webHost, webPort)
addr := fmt.Sprintf("%s:%d", cfg.WebHost, cfg.WebPort)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("Starting server failed: %s\n", err)
}

View File

@ -7,6 +7,8 @@ package main
import (
"bufio"
"bytes"
"context"
"database/sql"
"encoding/json"
"html/template"
"log"
@ -16,9 +18,14 @@ import (
"sort"
"sync"
"time"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
)
const sleepInterval = time.Second * 5
const (
sleepInterval = time.Second * 5
sleepPG = time.Second
)
var geoJSONTmpl = template.Must(template.New("geojson").Parse(
`{ "type": "Feature",
@ -64,21 +71,39 @@ func (p *player) same(o *player) bool {
math.Abs(p.Z-o.Z) < 0.000001
}
type sortPlayersByName []*player
const selectPlayersSQL = `
SELECT posx/10.0, posy/10.0, posz/10.0, name
FROM player
WHERE modification_date > now() - '2m'::interval`
func (pls sortPlayersByName) Len() int {
return len(pls)
}
func (pls sortPlayersByName) Less(i, j int) bool {
return pls[i].Name < pls[j].Name
}
func (pls sortPlayersByName) Swap(i, j int) {
pls[i], pls[j] = pls[j], pls[i]
func playersFromPostgreSQL(connS string) ([]*player, error) {
time.Sleep(sleepPG)
db, err := sql.Open("pgx", connS)
if err != nil {
return nil, err
}
defer db.Close()
rows, err := db.QueryContext(context.Background(), selectPlayersSQL)
if err != nil {
return nil, err
}
defer rows.Close()
var pls []*player
for rows.Next() {
var p player
if err := rows.Scan(&p.X, &p.Y, &p.Z, &p.Name); err != nil {
return nil, err
}
pls = append(pls, &p)
}
return pls, rows.Err()
}
func (ps *players) readFromFIFO() ([]*player, error) {
if host, ok := common.IsPostgreSQL(ps.fifo); ok {
return playersFromPostgreSQL(host)
}
file, err := os.Open(ps.fifo)
if err != nil {
return nil, err
@ -108,18 +133,22 @@ func samePlayers(a, b []*player) bool {
func (ps *players) run() {
for {
empty := len(ps.current()) == 0
pls, err := ps.readFromFIFO()
if err != nil {
//log.Printf("err: %s\n", err)
time.Sleep(sleepInterval)
continue
}
if pls == nil {
if empty && pls == nil {
//log.Println("no players")
continue
}
//log.Printf("%+q\n", pls)
sort.Sort(sortPlayersByName(pls))
sort.Slice(pls, func(i, j int) bool {
return pls[i].Name < pls[j].Name
})
var change bool
ps.mu.Lock()
//log.Printf("%+q\n", pls)
@ -129,18 +158,28 @@ func (ps *players) run() {
}
ps.mu.Unlock()
if change && ps.wsf != nil {
if pls == nil {
pls = []*player{}
}
// TODO: Throttle this!
ps.wsf.BroadcastPlayers(pls)
}
}
}
func (ps *players) current() []*player {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.pls
}
func (ps *players) initConnection(wsf *websocketForwarder, c *connection) {
wsf.singleSend(c, &plsMsg{Pls: ps.current()})
}
func (ps *players) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "application/json")
var pls []*player
ps.mu.RLock()
pls = ps.pls
ps.mu.RUnlock()
pls := ps.current()
encoder := json.NewEncoder(rw)
if err := encoder.Encode(pls); err != nil {
log.Printf("error: sending JSON failed: %s\n", err)

View File

@ -5,6 +5,7 @@
package main
import (
"context"
"encoding/json"
"image"
"image/color"
@ -16,8 +17,10 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/bamiaux/rez"
"github.com/jackc/pgx/v4"
"bytes"
@ -32,18 +35,18 @@ type baseTilesUpdates interface {
}
type tileUpdater struct {
changes map[xz]struct{}
btu baseTilesUpdates
mapDir string
redisAddress string
ips []net.IP
colors *common.Colors
bg color.RGBA
yMin, yMax int16
workers int
transparent bool
cond *sync.Cond
mu sync.Mutex
changes map[xz]struct{}
btu baseTilesUpdates
mapDir string
dbcf common.DBClientFactory
ips []net.IP
colors *common.Colors
bg color.RGBA
yMin, yMax int16
workers int
transparent bool
cond *sync.Cond
mu sync.Mutex
}
type xz struct {
@ -78,7 +81,8 @@ func (c xz) parent() xzm {
}
func newTileUpdater(
mapDir, redisAddress string,
mapDir string,
dbcf common.DBClientFactory,
ips []net.IP,
colors *common.Colors,
bg color.RGBA,
@ -88,17 +92,17 @@ func newTileUpdater(
btu baseTilesUpdates) *tileUpdater {
tu := tileUpdater{
btu: btu,
mapDir: mapDir,
redisAddress: redisAddress,
ips: ips,
changes: map[xz]struct{}{},
colors: colors,
bg: bg,
yMin: int16(yMin),
yMax: int16(yMax),
transparent: transparent,
workers: workers}
btu: btu,
mapDir: mapDir,
dbcf: dbcf,
ips: ips,
changes: map[xz]struct{}{},
colors: colors,
bg: bg,
yMin: int16(yMin),
yMax: int16(yMax),
transparent: transparent,
workers: workers}
tu.cond = sync.NewCond(&tu.mu)
return &tu
}
@ -130,6 +134,60 @@ func (tu *tileUpdater) checkIP(r *http.Request) bool {
return false
}
func (tu *tileUpdater) listen(host string, changeDuration time.Duration) {
xzCh := make(chan xz)
go func() {
ctx := context.Background()
conn, err := pgx.Connect(ctx, host)
if err != nil {
log.Printf("error: %v\n", err)
return
}
defer conn.Close(ctx)
if _, err := conn.Exec(ctx, "listen blockchanges"); err != nil {
log.Printf("error: %v\n", err)
return
}
for {
n, err := conn.WaitForNotification(ctx)
if err != nil {
log.Printf("error: %v\n", err)
continue
}
if n.Payload == "" {
continue
}
var c xz
dec := json.NewDecoder(strings.NewReader(n.Payload))
if err := dec.Decode(&c); err != nil {
log.Printf("error: %v\n", err)
continue
}
xzCh <- c
}
}()
ticker := time.NewTicker(changeDuration)
defer ticker.Stop()
var changes []xz
for {
select {
case c := <-xzCh:
changes = append(changes, c)
case <-ticker.C:
tu.sendChanges(changes)
changes = changes[:0]
}
}
}
func (tu *tileUpdater) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
if !tu.checkIP(r) {
log.Printf("WARN: Unauthorized update request from '%s'\n", r.RemoteAddr)
@ -146,18 +204,23 @@ func (tu *tileUpdater) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return
}
if len(newChanges) > 0 {
tu.cond.L.Lock()
for _, c := range newChanges {
tu.changes[c.quantize()] = struct{}{}
}
tu.cond.L.Unlock()
tu.cond.Signal()
}
tu.sendChanges(newChanges)
rw.WriteHeader(http.StatusOK)
}
func (tu *tileUpdater) sendChanges(changes []xz) {
if len(changes) == 0 {
return
}
tu.cond.L.Lock()
for _, c := range changes {
tu.changes[c.quantize()] = struct{}{}
}
tu.cond.L.Unlock()
tu.cond.Signal()
}
func extractChanges(changes map[xz]struct{}) []xzc {
chs := make([]xzc, len(changes))
var i int
@ -196,20 +259,14 @@ func (tu *tileUpdater) doUpdates() {
jobs := make(chan *xzc)
var done sync.WaitGroup
var proto string
if strings.ContainsRune(tu.redisAddress, '/') {
proto = "unix"
} else {
proto = "tcp"
}
for i, n := 0, common.Min(tu.workers, len(changes)); i < n; i++ {
var client *common.RedisClient
var client common.DBClient
var err error
if client, err = common.NewRedisClient(proto, tu.redisAddress); err != nil {
if client, err = tu.dbcf.Create(); err != nil {
log.Printf("WARN: Cannot connect to redis server: %s\n", err)
continue
}
btc := common.NewBaseTileCreator(
client, tu.colors, tu.bg,
tu.yMin, tu.yMax,
@ -343,26 +400,30 @@ func (tu *tileUpdater) updateBaseTiles(
update common.BaseTileUpdateFunc) {
type jobWriter struct {
job *xzc
wFn func() (bool, error)
canceled *bool
wFn func() (bool, error)
}
jWs := make(chan jobWriter)
asyncWrite := make(chan struct{})
go func() {
defer close(asyncWrite)
for jw := range jWs {
updated, err := jw.wFn()
if err != nil {
*jw.canceled = true
log.Printf("WARN: writing tile failed: %v.\n", err)
}
if !updated {
jw.job.canceled = true
*jw.canceled = true
}
}
}()
defer func() {
close(jWs)
btc.Close()
done.Done()
}()
@ -374,6 +435,13 @@ func (tu *tileUpdater) updateBaseTiles(
job.canceled = true
continue
}
jWs <- jobWriter{job, btc.WriteFunc(int(job.X), int(job.Z), update)}
jWs <- jobWriter{
&job.canceled,
btc.WriteFunc(int(job.X), int(job.Z), update),
}
}
close(jWs)
// Wait until all tiles are written.
<-asyncWrite
}