mirror of
https://bitbucket.org/s_l_teichmann/mtsatellite
synced 2025-01-08 08:00:17 +01:00
448 lines
8.7 KiB
Go
448 lines
8.7 KiB
Go
// Copyright 2014, 2015 by Sascha L. Teichmann
|
|
// Use of this source code is governed by the MIT license
|
|
// that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"image"
|
|
"image/color"
|
|
"image/draw"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bamiaux/rez"
|
|
"github.com/jackc/pgx/v4"
|
|
|
|
"bytes"
|
|
|
|
"bitbucket.org/s_l_teichmann/mtsatellite/common"
|
|
)
|
|
|
|
// Number of check sums to keep in memory.
|
|
const maxHashedTiles = 256
|
|
|
|
type baseTilesUpdates interface {
|
|
BaseTilesUpdated([]xz)
|
|
}
|
|
|
|
type tileUpdater struct {
|
|
changes map[xz]struct{}
|
|
btu baseTilesUpdates
|
|
mapDir string
|
|
dbcf common.DBClientFactory
|
|
ips []net.IP
|
|
colors *common.Colors
|
|
bg color.RGBA
|
|
yMin, yMax int16
|
|
workers int
|
|
transparent bool
|
|
cond *sync.Cond
|
|
mu sync.Mutex
|
|
}
|
|
|
|
type xz struct {
|
|
X int16
|
|
Z int16
|
|
}
|
|
|
|
type xzc struct {
|
|
xz
|
|
canceled bool
|
|
}
|
|
|
|
type xzm struct {
|
|
xz
|
|
Mask uint16
|
|
}
|
|
|
|
func (c xz) quantize() xz {
|
|
return xz{X: (c.X - -1933) / 16, Z: (c.Z - -1933) / 16}
|
|
}
|
|
|
|
func (c xz) dequantize() xz {
|
|
return xz{X: c.X*16 + -1933, Z: c.Z*16 + -1933}
|
|
}
|
|
|
|
func (c xz) parent() xzm {
|
|
xp, xr := c.X>>1, uint16(c.X&1)
|
|
zp, zr := c.Z>>1, uint16(c.Z&1)
|
|
return xzm{
|
|
xz{X: xp, Z: zp},
|
|
1 << (zr<<1 | xr)}
|
|
}
|
|
|
|
func newTileUpdater(
|
|
mapDir string,
|
|
dbcf common.DBClientFactory,
|
|
ips []net.IP,
|
|
colors *common.Colors,
|
|
bg color.RGBA,
|
|
yMin, yMax int,
|
|
transparent bool,
|
|
workers int,
|
|
btu baseTilesUpdates) *tileUpdater {
|
|
|
|
tu := tileUpdater{
|
|
btu: btu,
|
|
mapDir: mapDir,
|
|
dbcf: dbcf,
|
|
ips: ips,
|
|
changes: map[xz]struct{}{},
|
|
colors: colors,
|
|
bg: bg,
|
|
yMin: int16(yMin),
|
|
yMax: int16(yMax),
|
|
transparent: transparent,
|
|
workers: workers}
|
|
tu.cond = sync.NewCond(&tu.mu)
|
|
return &tu
|
|
}
|
|
|
|
func (tu *tileUpdater) checkIP(r *http.Request) bool {
|
|
if len(tu.ips) == 0 {
|
|
return true
|
|
}
|
|
|
|
idx := strings.LastIndex(r.RemoteAddr, ":")
|
|
if idx < 0 {
|
|
log.Printf("WARN: cannot extract host from '%s'.\n", r.RemoteAddr)
|
|
return false
|
|
}
|
|
|
|
host := strings.Trim(r.RemoteAddr[:idx], "[]")
|
|
ip := net.ParseIP(host)
|
|
if ip == nil {
|
|
log.Printf("WARN: cannot get IP for host '%s'.\n", host)
|
|
return false
|
|
}
|
|
|
|
for i := range tu.ips {
|
|
if bytes.Compare(tu.ips[i], ip) == 0 {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (tu *tileUpdater) listen(host string, changeDuration time.Duration) {
|
|
|
|
xzCh := make(chan xz)
|
|
|
|
go func() {
|
|
ctx := context.Background()
|
|
|
|
conn, err := pgx.Connect(ctx, host)
|
|
if err != nil {
|
|
log.Printf("error: %v\n", err)
|
|
return
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
if _, err := conn.Exec(ctx, "listen blockchanges"); err != nil {
|
|
log.Printf("error: %v\n", err)
|
|
return
|
|
}
|
|
|
|
for {
|
|
n, err := conn.WaitForNotification(ctx)
|
|
if err != nil {
|
|
log.Printf("error: %v\n", err)
|
|
continue
|
|
}
|
|
if n.Payload == "" {
|
|
continue
|
|
}
|
|
var c xz
|
|
dec := json.NewDecoder(strings.NewReader(n.Payload))
|
|
if err := dec.Decode(&c); err != nil {
|
|
log.Printf("error: %v\n", err)
|
|
continue
|
|
}
|
|
xzCh <- c
|
|
}
|
|
}()
|
|
|
|
ticker := time.NewTicker(changeDuration)
|
|
defer ticker.Stop()
|
|
|
|
var changes []xz
|
|
|
|
for {
|
|
select {
|
|
case c := <-xzCh:
|
|
changes = append(changes, c)
|
|
case <-ticker.C:
|
|
tu.sendChanges(changes)
|
|
changes = changes[:0]
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tu *tileUpdater) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
|
if !tu.checkIP(r) {
|
|
log.Printf("WARN: Unauthorized update request from '%s'\n", r.RemoteAddr)
|
|
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
var err error
|
|
var newChanges []xz
|
|
decoder := json.NewDecoder(r.Body)
|
|
if err = decoder.Decode(&newChanges); err != nil {
|
|
log.Printf("WARN: JSON document broken: %s\n", err)
|
|
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
tu.sendChanges(newChanges)
|
|
|
|
rw.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
func (tu *tileUpdater) sendChanges(changes []xz) {
|
|
if len(changes) == 0 {
|
|
return
|
|
}
|
|
tu.cond.L.Lock()
|
|
for _, c := range changes {
|
|
tu.changes[c.quantize()] = struct{}{}
|
|
}
|
|
tu.cond.L.Unlock()
|
|
tu.cond.Signal()
|
|
}
|
|
|
|
func extractChanges(changes map[xz]struct{}) []xzc {
|
|
chs := make([]xzc, len(changes))
|
|
var i int
|
|
for ch := range changes {
|
|
chs[i] = xzc{ch, false}
|
|
i++
|
|
}
|
|
return chs
|
|
}
|
|
|
|
func activeChanges(changes []xzc) []xz {
|
|
chs := make([]xz, 0, len(changes))
|
|
for i := range changes {
|
|
if !changes[i].canceled {
|
|
chs = append(chs, changes[i].xz)
|
|
}
|
|
}
|
|
return chs
|
|
}
|
|
|
|
func (tu *tileUpdater) doUpdates() {
|
|
|
|
bth := common.NewBaseTileHash(maxHashedTiles)
|
|
|
|
baseDir := filepath.Join(tu.mapDir, "8")
|
|
|
|
for {
|
|
tu.cond.L.Lock()
|
|
for len(tu.changes) == 0 {
|
|
tu.cond.Wait()
|
|
}
|
|
changes := extractChanges(tu.changes)
|
|
tu.changes = map[xz]struct{}{}
|
|
tu.cond.L.Unlock()
|
|
|
|
jobs := make(chan *xzc)
|
|
var done sync.WaitGroup
|
|
|
|
for i, n := 0, common.Min(tu.workers, len(changes)); i < n; i++ {
|
|
var client common.DBClient
|
|
var err error
|
|
if client, err = tu.dbcf.Create(); err != nil {
|
|
log.Printf("WARN: Cannot connect to redis server: %s\n", err)
|
|
continue
|
|
}
|
|
|
|
btc := common.NewBaseTileCreator(
|
|
client, tu.colors, tu.bg,
|
|
tu.yMin, tu.yMax,
|
|
tu.transparent, baseDir)
|
|
done.Add(1)
|
|
go tu.updateBaseTiles(jobs, btc, &done, bth.Update)
|
|
}
|
|
|
|
for i := range changes {
|
|
jobs <- &changes[i]
|
|
}
|
|
close(jobs)
|
|
done.Wait()
|
|
|
|
actChs := activeChanges(changes)
|
|
|
|
if len(actChs) == 0 {
|
|
continue
|
|
}
|
|
|
|
parentJobs := make(map[xz]uint16)
|
|
for i := range actChs {
|
|
pxz := actChs[i].parent()
|
|
parentJobs[pxz.xz] |= pxz.Mask
|
|
}
|
|
|
|
for level := 7; level >= 0; level-- {
|
|
pJobs := make(chan xzm)
|
|
for i, n := 0, common.Min(len(parentJobs), tu.workers); i < n; i++ {
|
|
done.Add(1)
|
|
go tu.updatePyramidTiles(level, pJobs, &done)
|
|
}
|
|
ppJobs := make(map[xz]uint16)
|
|
for c, mask := range parentJobs {
|
|
pJobs <- xzm{c, mask}
|
|
pxz := c.parent()
|
|
ppJobs[pxz.xz] |= pxz.Mask
|
|
}
|
|
close(pJobs)
|
|
done.Wait()
|
|
parentJobs = ppJobs
|
|
}
|
|
|
|
if tu.btu != nil {
|
|
tu.btu.BaseTilesUpdated(actChs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tu *tileUpdater) updatePyramidTiles(
|
|
level int, jobs chan xzm, done *sync.WaitGroup) {
|
|
|
|
defer done.Done()
|
|
scratch := image.NewRGBA(image.Rect(0, 0, 256, 256))
|
|
resized := image.NewRGBA(image.Rect(0, 0, 128, 128))
|
|
|
|
for job := range jobs {
|
|
if err := tu.updatePyramidTile(scratch, resized, level, job); err != nil {
|
|
log.Printf("Updating pyramid tile failed: %s\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
(0,0) (128, 0)
|
|
(0, 128) (128, 128)
|
|
*/
|
|
|
|
var dps = [4]image.Point{
|
|
image.Pt(0, 128),
|
|
image.Pt(128, 128),
|
|
image.Pt(0, 0),
|
|
image.Pt(128, 0),
|
|
}
|
|
|
|
var ofs = [4][2]int{
|
|
{0, 0},
|
|
{1, 0},
|
|
{0, 1},
|
|
{1, 1}}
|
|
|
|
var windowSize = image.Pt(128, 128)
|
|
|
|
func (tu *tileUpdater) updatePyramidTile(scratch, resized *image.RGBA, level int, j xzm) error {
|
|
|
|
var orig image.Image
|
|
|
|
origPath := filepath.Join(
|
|
tu.mapDir,
|
|
strconv.Itoa(level),
|
|
strconv.Itoa(int(j.X)),
|
|
strconv.Itoa(int(j.Z))+".png")
|
|
|
|
sr := resized.Bounds()
|
|
levelDir := strconv.Itoa(level + 1)
|
|
for i := uint16(0); i < 4; i++ {
|
|
if j.Mask&(1<<i) != 0 {
|
|
//log.Printf("level %d: modified %d\n", level, i)
|
|
o := ofs[i]
|
|
bx, bz := int(2*j.X), int(2*j.Z)
|
|
path := filepath.Join(
|
|
tu.mapDir,
|
|
levelDir,
|
|
strconv.Itoa(bx+o[0]),
|
|
strconv.Itoa(bz+o[1])+".png")
|
|
img := common.LoadPNG(path, tu.bg)
|
|
if err := rez.Convert(resized, img, common.ResizeFilter); err != nil {
|
|
return err
|
|
}
|
|
r := sr.Sub(sr.Min).Add(dps[i])
|
|
draw.Draw(scratch, r, resized, sr.Min, draw.Src)
|
|
} else {
|
|
// Load lazy
|
|
if orig == nil {
|
|
orig = common.LoadPNG(origPath, tu.bg)
|
|
}
|
|
//log.Printf("level %d: copied %d\n", level, i)
|
|
min := orig.Bounds().Min.Add(dps[i])
|
|
r := image.Rectangle{min, min.Add(windowSize)}
|
|
draw.Draw(scratch, r, orig, min, draw.Src)
|
|
}
|
|
}
|
|
|
|
return common.SaveAsPNGAtomic(origPath, scratch)
|
|
}
|
|
|
|
func (tu *tileUpdater) updateBaseTiles(
|
|
jobs chan *xzc,
|
|
btc *common.BaseTileCreator,
|
|
done *sync.WaitGroup,
|
|
update common.BaseTileUpdateFunc) {
|
|
|
|
type jobWriter struct {
|
|
canceled *bool
|
|
wFn func() (bool, error)
|
|
}
|
|
|
|
jWs := make(chan jobWriter)
|
|
|
|
asyncWrite := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(asyncWrite)
|
|
|
|
for jw := range jWs {
|
|
updated, err := jw.wFn()
|
|
if err != nil {
|
|
*jw.canceled = true
|
|
log.Printf("WARN: writing tile failed: %v.\n", err)
|
|
}
|
|
if !updated {
|
|
*jw.canceled = true
|
|
}
|
|
}
|
|
}()
|
|
|
|
defer func() {
|
|
btc.Close()
|
|
done.Done()
|
|
}()
|
|
|
|
for job := range jobs {
|
|
xz := job.dequantize()
|
|
if err := btc.RenderArea(xz.X-1, xz.Z-1); err != nil {
|
|
log.Printf("WARN: rendering tile failed: %v.\n", err)
|
|
job.canceled = true
|
|
continue
|
|
}
|
|
jWs <- jobWriter{
|
|
&job.canceled,
|
|
btc.WriteFunc(int(job.X), int(job.Z), update),
|
|
}
|
|
}
|
|
|
|
close(jWs)
|
|
// Wait until all tiles are written.
|
|
<-asyncWrite
|
|
}
|