WIP: add tile updating in in web mapper with pg listen/notify

This commit is contained in:
Sascha L. Teichmann
2022-02-28 11:07:50 +01:00
parent 4b654672e7
commit d98df1c1cd
5 changed files with 102 additions and 21 deletions

View File

@ -10,6 +10,7 @@ import (
"log"
"net"
"net/http"
"time"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
@ -35,6 +36,7 @@ func main() {
version bool
yMin int
yMax int
changeDuration time.Duration
)
defaultBgColor := common.ColorToHex(common.BackgroundColor)
@ -74,6 +76,7 @@ func main() {
flag.StringVar(&playersFIFO, "ps", "", "Path to FIFO file to read active players from (shorthand).")
flag.IntVar(&yMin, "ymin", common.MinHeight, "Minimum y in blocks.")
flag.IntVar(&yMax, "ymax", common.MaxHeight, "Maximum y in blocks.")
flag.CommandLine.DurationVar(&changeDuration, "change-duration", time.Second, "Duration to aggregate changes. (PG only)")
flag.BoolVar(&version, "version", false, "Print version and exit.")
flag.Parse()
@ -132,7 +135,12 @@ func main() {
workers,
btu)
go tu.doUpdates()
router.Path("/update").Methods("POST").Handler(tu)
if pgHost, ok := common.IsPostgreSQL(redisHost); btu != nil && ok {
go tu.listen(pgHost, changeDuration)
} else {
router.Path("/update").Methods("POST").Handler(tu)
}
}
router.PathPrefix("/").Handler(http.FileServer(http.Dir(webDir)))

View File

@ -5,6 +5,7 @@
package main
import (
"context"
"encoding/json"
"image"
"image/color"
@ -16,8 +17,10 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/bamiaux/rez"
"github.com/jackc/pgx/v4"
"bytes"
@ -131,6 +134,60 @@ func (tu *tileUpdater) checkIP(r *http.Request) bool {
return false
}
func (tu *tileUpdater) listen(host string, changeDuration time.Duration) {
xzCh := make(chan xz)
go func() {
ctx := context.Background()
conn, err := pgx.Connect(ctx, host)
if err != nil {
log.Printf("error: %v\n", err)
return
}
defer conn.Close(ctx)
if _, err := conn.Exec(ctx, "listen blockchanges"); err != nil {
log.Printf("error: %v\n", err)
return
}
for {
n, err := conn.WaitForNotification(ctx)
if err != nil {
log.Printf("error: %v\n", err)
continue
}
if n.Payload == "" {
continue
}
var c xz
dec := json.NewDecoder(strings.NewReader(n.Payload))
if err := dec.Decode(&c); err != nil {
log.Printf("error: %v\n", err)
continue
}
xzCh <- c
}
}()
ticker := time.NewTicker(changeDuration)
defer ticker.Stop()
var changes []xz
for {
select {
case c := <-xzCh:
changes = append(changes, c)
case <-ticker.C:
tu.sendChanges(changes)
changes = changes[:0]
}
}
}
func (tu *tileUpdater) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
if !tu.checkIP(r) {
log.Printf("WARN: Unauthorized update request from '%s'\n", r.RemoteAddr)
@ -147,18 +204,23 @@ func (tu *tileUpdater) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return
}
if len(newChanges) > 0 {
tu.cond.L.Lock()
for _, c := range newChanges {
tu.changes[c.quantize()] = struct{}{}
}
tu.cond.L.Unlock()
tu.cond.Signal()
}
tu.sendChanges(newChanges)
rw.WriteHeader(http.StatusOK)
}
func (tu *tileUpdater) sendChanges(changes []xz) {
if len(changes) == 0 {
return
}
tu.cond.L.Lock()
for _, c := range changes {
tu.changes[c.quantize()] = struct{}{}
}
tu.cond.L.Unlock()
tu.cond.Signal()
}
func extractChanges(changes map[xz]struct{}) []xzc {
chs := make([]xzc, len(changes))
var i int