Remplissage du dépôt

This commit is contained in:
sys4-fr
2018-12-13 21:09:02 +01:00
commit 6af26530ca
71 changed files with 12648 additions and 0 deletions

View File

@ -0,0 +1,45 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
type (
// Block is the essential transfer unit from to the database.
// Key is the serialized spatial position.
// Data is the serialized from of the corresponding block data.
Block struct {
Key []byte
Data []byte
}
// Session is a database session.
Session interface {
// Del deletes a block by a given key.
Del(hash, key []byte) (bool, error)
// Fetch fetches the block data for a given position.
Fetch(hash, key []byte) ([]byte, error)
// InTransaction returns true if a transaction is running.
InTransaction() bool
// Store stores a block with a given position and data.
Store(hash, key, value []byte) (bool, error)
// AllKeys returns all keys in the database.
AllKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error)
// SpatialQuery performs a box query between the positions first and second.
SpatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan Block, error)
// BeginTransaction starts a transcation.
BeginTransaction() error
// CommitTransaction finishes a transaction.
CommitTransaction() error
// Close closes the database session.
Close() error
}
// Backend is the interface representing a database.
Backend interface {
// NewSession opens a new session.
NewSession() (Session, error)
// Shutdown shuts down the database server.
Shutdown() error
}
)

View File

@ -0,0 +1,77 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"encoding/json"
"log"
"net/http"
"sync"
"bytes"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
)
// Pull up if it _really_ produces too much data.
const quantizationFactor = 1
type quantizedXZ struct {
X, Z int16
}
type changeTracker struct {
changes map[quantizedXZ]struct{}
mutex sync.Mutex
}
func newChangeTracker() *changeTracker {
return &changeTracker{changes: make(map[quantizedXZ]struct{})}
}
func (ct *changeTracker) BlockChanged(coord common.Coord) {
ct.mutex.Lock()
ct.changes[quantizedXZ{
X: coord.X / quantizationFactor,
Z: coord.Z / quantizationFactor}] = struct{}{}
ct.mutex.Unlock()
}
func (ct *changeTracker) FlushChanges(url string) {
var oldChanges map[quantizedXZ]struct{}
ct.mutex.Lock()
if len(ct.changes) > 0 {
oldChanges = ct.changes
ct.changes = make(map[quantizedXZ]struct{})
}
ct.mutex.Unlock()
if oldChanges == nil {
return
}
go func() {
changes := make([]quantizedXZ, len(oldChanges))
i := 0
for change := range oldChanges {
changes[i] = change
i++
}
var err error
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
if err = encoder.Encode(changes); err != nil {
log.Printf("WARN: encode changes to JSON failed: %s\n", err)
return
}
var resp *http.Response
resp, err = http.Post(
url, "application/json", bytes.NewBuffer(buf.Bytes()))
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
log.Printf("WARN: posting changes to %s failed: %s\n", url, err)
}
}()
}

View File

@ -0,0 +1,241 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"log"
"net"
"strconv"
)
var (
redisOk = []byte("+OK\r\n")
redisPong = []byte("+PONG\r\n")
redisError = []byte("-ERR\r\n")
redisNoSuchBlock = []byte("$-1\r\n")
redisCrnl = []byte("\r\n")
redisEmptyArray = []byte("*0\r\n")
redisQueued = []byte("+QUEUED\r\n")
redisTrue = []byte(":1\r\n")
redisFalse = []byte(":0\r\n")
)
type Connection struct {
conn net.Conn
session Session
maxBulkStringSize int64
boolArray []bool
}
func NewConnection(conn net.Conn, session Session, maxBulkStringSize int64) *Connection {
return &Connection{
conn: conn,
session: session,
maxBulkStringSize: maxBulkStringSize,
boolArray: []bool{}}
}
func (c *Connection) Run() {
defer func() {
c.session.Close()
c.conn.Close()
}()
r := bufio.NewReaderSize(c.conn, 8*1024)
parser := NewRedisParser(r, c, c.maxBulkStringSize)
parser.Parse()
log.Println("client disconnected")
}
func logError(err error) bool {
if err != nil {
log.Printf("ERROR: %s\n", err)
return false
}
return true
}
func (c *Connection) Hdel(hash, key []byte) bool {
success, err := c.session.Del(hash, key)
if err != nil {
return c.writeError(err)
}
return c.writeBool(success)
}
func (c *Connection) Hget(hash, key []byte) bool {
var err error
var data []byte
if data, err = c.session.Fetch(hash, key); err != nil {
return c.writeError(err)
}
return c.writeBlock(data)
}
func (c *Connection) Hset(hash, key, data []byte) bool {
var err error
var exists bool
if exists, err = c.session.Store(hash, key, data); err != nil {
return c.writeError(err)
}
if c.session.InTransaction() {
c.boolArray = append(c.boolArray, exists)
return c.writeQueued()
}
return c.writeBool(exists)
}
func (c *Connection) Multi() bool {
if c.session.InTransaction() {
log.Println("WARN: Already running transaction.")
} else {
if err := c.session.BeginTransaction(); err != nil {
return c.writeError(err)
}
}
return c.writeOk()
}
func (c *Connection) Exec() bool {
if !c.session.InTransaction() {
return c.writeEmptyArray()
}
arr := c.boolArray
c.boolArray = []bool{}
if err := c.session.CommitTransaction(); err != nil {
return c.writeError(err)
}
return c.writeBoolArray(arr)
}
func (c *Connection) Hkeys(hash []byte) bool {
var (
err error
n int
keys <-chan []byte
done = make(chan struct{})
)
defer close(done)
if keys, n, err = c.session.AllKeys(hash, done); err != nil {
return c.writeError(err)
}
if n == 0 {
return c.writeEmptyArray()
}
if _, err := c.conn.Write(redisLength('*', n)); err != nil {
return logError(err)
}
for key := range keys {
if err = c.writeBulkString(key); err != nil {
return logError(err)
}
}
return true
}
func (c *Connection) Ping() bool {
return c.writeMessage(redisPong)
}
func (c *Connection) HSpatial(hash, first, second []byte) bool {
var (
err error
blocks <-chan Block
done = make(chan struct{})
)
defer close(done)
if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil {
return c.writeError(err)
}
for block := range blocks {
if err = c.writeBulkString(block.Key); err != nil {
return logError(err)
}
if err = c.writeBulkString(block.Data); err != nil {
return logError(err)
}
}
return logError(c.writeBulkString(nil))
}
func (c *Connection) writeError(err error) bool {
logError(err)
return c.writeMessage(redisError)
}
func (c *Connection) writeEmptyArray() bool {
return c.writeMessage(redisEmptyArray)
}
func (c *Connection) writeBool(b bool) bool {
if b {
return c.writeMessage(redisTrue)
}
return c.writeMessage(redisFalse)
}
func redisLength(prefix byte, s int) []byte {
buf := append(make([]byte, 0, 16), prefix)
return append(strconv.AppendInt(buf, int64(s), 10), '\r', '\n')
}
func (c *Connection) writeBoolArray(arr []bool) bool {
if _, err := c.conn.Write(redisLength('*', len(arr))); err != nil {
return logError(err)
}
for _, b := range arr {
if !c.writeBool(b) {
return false
}
}
return true
}
func (c *Connection) writeMessage(msg []byte) bool {
_, err := c.conn.Write(msg)
return logError(err)
}
func (c *Connection) writeOk() bool {
return c.writeMessage(redisOk)
}
func (c *Connection) writeQueued() bool {
return c.writeMessage(redisQueued)
}
func (c *Connection) writeBlock(data []byte) bool {
return logError(c.writeBulkString(data))
}
func (c *Connection) writeBulkString(data []byte) (err error) {
con := c.conn
if data == nil {
_, err = con.Write(redisNoSuchBlock)
} else {
if _, err = con.Write(redisLength('$', len(data))); err != nil {
return
}
if _, err = con.Write(data); err != nil {
return
}
_, err = con.Write(redisCrnl)
}
return
}

429
cmd/mtredisalize/leveldb.go Normal file
View File

@ -0,0 +1,429 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"log"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
leveldb "github.com/jmhodges/levigo"
)
type LevelDBBackend struct {
cache *leveldb.Cache
db *leveldb.DB
interleaved bool
coverage *common.Coverage3D
encoder common.KeyTranscoder
decoder common.KeyTranscoder
changeTracker *changeTracker
}
type LevelDBSession struct {
backend *LevelDBBackend
tx *leveldb.WriteBatch
}
func NewLeveDBBackend(
path string,
changeTracker *changeTracker,
interleaved bool,
cacheSize int) (ldb *LevelDBBackend, err error) {
opts := leveldb.NewOptions()
var cache *leveldb.Cache
if cacheSize > 0 {
cache = leveldb.NewLRUCache(cacheSize * 1024 * 1024)
opts.SetCache(cache)
}
opts.SetCreateIfMissing(true)
var db *leveldb.DB
if db, err = leveldb.Open(path, opts); err != nil {
if cache != nil {
cache.Close()
}
return
}
var (
encoder common.KeyTranscoder
decoder common.KeyTranscoder
)
if interleaved {
encoder = common.TranscodeInterleavedToPlain
decoder = common.TranscodePlainToInterleaved
} else {
encoder = common.IdentityTranscoder
decoder = common.IdentityTranscoder
}
ldb = &LevelDBBackend{
cache: cache,
db: db,
interleaved: interleaved,
encoder: encoder,
decoder: decoder,
changeTracker: changeTracker,
}
if !interleaved {
if err = ldb.buildCoverage(); err != nil {
ldb.Shutdown()
ldb = nil
return
}
}
return
}
func (ldb *LevelDBBackend) buildCoverage() error {
log.Println("INFO: Start building coverage index (this may take some time)...")
coverage := common.NewCoverage3D()
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldb.db.NewIterator(ro)
it.SeekToFirst()
for ; it.Valid(); it.Next() {
c, err := common.DecodeStringBytesToCoord(it.Key())
if err != nil {
return err
}
coverage.Insert(c)
}
if err := it.GetError(); err != nil {
return err
}
ldb.coverage = coverage
log.Println("INFO: Finished building coverage index.")
return nil
}
func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil
}
func (ldbs *LevelDBSession) Close() error {
if ldbs.tx != nil {
ldbs.tx.Close()
}
return nil
}
func (ldb *LevelDBBackend) Shutdown() error {
ldb.db.Close()
if ldb.cache != nil {
ldb.cache.Close()
}
return nil
}
func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
data, err = ldbs.backend.db.Get(ro, key)
if err != nil {
return
}
if data == nil {
success = false
return
}
success = true
wo := leveldb.NewWriteOptions()
defer wo.Close()
err = ldbs.backend.db.Delete(wo, key)
return
}
func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
//if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
return
}
func (ldbs *LevelDBSession) InTransaction() bool {
return ldbs.tx != nil
}
func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = db.Get(ro, key); err != nil {
return
}
exists = data != nil
return
}
func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) {
origKey := key
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
if exists, err = keyExists(ldbs.backend.db, key); err != nil {
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
} else {
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
if err != nil {
return
}
}
// This technically too early because this is done in a transactions
// which are commited (and possible fail) later.
if ldbs.backend.changeTracker != nil || ldbs.backend.coverage != nil {
c, err := common.DecodeStringBytesToCoord(origKey)
if err != nil {
return exists, err
}
if ldbs.backend.coverage != nil && !exists {
ldbs.backend.coverage.Insert(c)
}
if ldbs.backend.changeTracker != nil {
ldbs.backend.changeTracker.BlockChanged(c)
}
}
return
}
func (ldbs *LevelDBSession) BeginTransaction() error {
ldbs.tx = leveldb.NewWriteBatch()
return nil
}
func (ldbs *LevelDBSession) CommitTransaction() (err error) {
tx := ldbs.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return
}
ldbs.tx = nil
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = ldbs.backend.db.Write(wo, tx)
wo.Close()
tx.Close()
return
}
func (ldbs *LevelDBSession) AllKeys(
hash []byte,
done <-chan struct{}) (<-chan []byte, int, error) {
ro := leveldb.NewReadOptions()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
it.SeekToFirst()
var n int
for ; it.Valid(); it.Next() {
n++
}
if err := it.GetError(); err != nil {
it.Close()
ro.Close()
return nil, n, err
}
keys := make(chan []byte)
go func() {
defer ro.Close()
defer close(keys)
defer it.Close()
it.SeekToFirst()
encoder := ldbs.backend.encoder
for ; it.Valid(); it.Next() {
if key, err := encoder(it.Key()); err == nil {
select {
case keys <- key:
case <-done:
return
}
} else {
log.Printf("WARN: %s\n", err)
return
}
}
if err := it.GetError(); err != nil {
log.Printf("WARN: %s\n", err)
}
}()
return keys, n, nil
}
func (ldbs *LevelDBSession) SpatialQuery(
hash, first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
if ldbs.backend.interleaved {
return ldbs.interleavedSpatialQuery(first, second, done)
}
return ldbs.plainSpatialQuery(first, second, done)
}
func (ldbs *LevelDBSession) plainSpatialQuery(
first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return nil, err
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block)
go func() {
defer close(blocks)
ro := leveldb.NewReadOptions()
defer ro.Close()
var a, b common.Coord
for _, r := range ldbs.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
a.X, b.X = int16(r.X1), int16(r.X2)
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
b.Y = a.Y
// The keys in the database are stored and ordered as strings
// "1", "10", ..., "19", "2", "20", "21" so you cannot use
// an iterator and assume it is numerical ordered.
// Each block is fetched with a Get instead.
for f, t := common.CoordToPlain(a), common.CoordToPlain(b); f <= t; f++ {
key := common.StringToBytes(f)
value, err := ldbs.backend.db.Get(ro, key)
if err != nil {
log.Printf("get failed: %s\n", err)
return
}
if value != nil {
select {
case blocks <- Block{Key: key, Data: value}:
case <-done:
return
}
}
}
}
}
}()
return blocks, nil
}
func (ldbs *LevelDBSession) interleavedSpatialQuery(
first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return nil, err
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block)
go func() {
defer close(blocks)
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
defer it.Close()
zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2)
// Should not be necessary.
zmin, zmax = common.Order64(zmin, zmax)
var (
cub = common.Cuboid{P1: c1, P2: c2}
err error
encodedKey []byte
)
//log.Printf("seeking to: %d\n", zmin)
it.Seek(common.ToBigEndian(zmin))
for it.Valid() {
zcode := common.FromBigEndian(it.Key())
if zcode > zmax {
break
}
if c := common.InterleavedToCoord(zcode); cub.Contains(c) {
if encodedKey, err = common.EncodeStringToBytes(common.CoordToPlain(c)); err != nil {
log.Printf("error encoding key: %s\n", err)
return
}
select {
case blocks <- Block{Key: encodedKey, Data: it.Value()}:
case <-done:
return
}
it.Next()
} else {
next := common.BigMin(zmin, zmax, zcode)
//log.Printf("seeking to: %d\n", next)
it.Seek(common.ToBigEndian(next))
//log.Printf("seeking done: %d\n", next)
}
}
//log.Println("iterating done")
if err = it.GetError(); err != nil {
log.Printf("error while iterating: %s\n", err)
return
}
}()
return blocks, nil
}

177
cmd/mtredisalize/main.go Normal file
View File

@ -0,0 +1,177 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"runtime"
"strings"
"time"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
)
const (
defaultMaxBulkStringSize = 32 * 1024 * 1024
defaultGCDuration = "24h"
defaultChangeDuration = "30s"
)
func usage() {
fmt.Fprintf(os.Stderr,
"Usage: %s [<options>] <database>\n", os.Args[0])
fmt.Fprintln(os.Stderr, "Options:")
flag.PrintDefaults()
}
func main() {
var (
port int
host string
driver string
cacheSize int
version bool
interleaved bool
changeURL string
gcDuration string
changeDuration string
maxBulkStringSize int64
)
flag.Usage = usage
flag.IntVar(&port, "port", 6379, "port to bind")
flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)")
flag.StringVar(&host, "host", "", "host to bind")
flag.IntVar(&cacheSize, "cache", 32, "cache size in MB")
flag.BoolVar(&version, "version", false, "Print version and exit.")
flag.BoolVar(&interleaved,
"interleaved", false, "Backend stores key in interleaved form.")
flag.StringVar(&gcDuration,
"gc-duration", defaultGCDuration, "Duration between forced GCs.")
flag.StringVar(&changeDuration,
"change-duration", defaultChangeDuration, "Duration to aggregate changes.")
flag.StringVar(&changeURL, "change-url", "", "URL to send changes to.")
flag.Int64Var(&maxBulkStringSize, "max-bulk-string-size", defaultMaxBulkStringSize,
"max size of a bulk string to be accepted as input (in bytes).")
flag.Parse()
if version {
common.PrintVersionAndExit()
}
if flag.NArg() < 1 {
log.Fatal("Missing path to world")
}
var (
err error
backend Backend
gcDur time.Duration
chDur time.Duration
changeTracker *changeTracker
)
if gcDur, err = time.ParseDuration(gcDuration); err != nil {
log.Fatal(err)
}
// Setup the change listening stuff.
var changeChan <-chan time.Time
useChangeNotification := changeURL != ""
if useChangeNotification {
if chDur, err = time.ParseDuration(changeDuration); err != nil {
log.Fatal(err)
}
changeChan = time.Tick(chDur)
changeTracker = newChangeTracker()
} else {
// We will never receive ticks on this.
changeChan = make(<-chan time.Time)
}
path := flag.Arg(0)
if driver == "sqlite" {
if backend, err = NewSQLiteBackend(path, changeTracker, interleaved); err != nil {
log.Fatal(err)
}
} else {
if backend, err = NewLeveDBBackend(
path, changeTracker, interleaved, cacheSize); err != nil {
log.Fatal(err)
}
}
defer backend.Shutdown()
var listener net.Listener
var proto, address string
if strings.ContainsRune(host, '/') {
proto, address = "unix", host
} else {
proto, address = "tcp", fmt.Sprintf("%s:%d", host, port)
}
listener, err = net.Listen(proto, address)
if err != nil {
log.Fatal(err)
}
defer listener.Close()
log.Printf("Server started at %s\n", listener.Addr())
connChan := make(chan net.Conn)
defer close(connChan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, os.Kill)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
log.Printf("Client accepted from: %s\n", conn.RemoteAddr())
connChan <- conn
}
}()
log.Printf("Doing garbage collection every: %s\n", gcDur)
gcChan := time.Tick(gcDur)
for {
select {
case conn := <-connChan:
var session Session
if session, err = backend.NewSession(); err != nil {
log.Printf("Cannot create session: %s\n", err)
conn.Close()
} else {
go NewConnection(conn, session, maxBulkStringSize).Run()
}
case <-sigChan:
log.Println("Shutting down")
return
case <-gcChan:
log.Println("Starting garbage collection.")
runtime.GC()
log.Println("Garbage collection done.")
case <-changeChan:
if changeTracker != nil {
changeTracker.FlushChanges(changeURL)
}
}
}
}

View File

@ -0,0 +1,273 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"strconv"
"strings"
)
type RedisCommands interface {
Hdel(hash, key []byte) bool
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
HSpatial(hash, first, second []byte) bool
Ping() bool
}
type RedisParser struct {
reader *bufio.Reader
commands RedisCommands
missing int64
args []interface{}
maxBulkStringSize int64
}
func NewRedisParser(reader *bufio.Reader,
commands RedisCommands,
maxBulkStringSize int64) *RedisParser {
return &RedisParser{
reader: reader,
commands: commands,
maxBulkStringSize: maxBulkStringSize}
}
func (rp *RedisParser) Parse() {
for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() {
}
}
func (rp *RedisParser) nextLine() []byte {
line, err := rp.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
rp.consumeError(err)
}
return nil
}
return bytes.TrimRight(line, "\r\n")
}
func (rp *RedisParser) dispatch(line []byte) bool {
if len(line) < 1 {
return false
}
switch line[0] {
case '-':
return true // ignore errors
case ':':
return rp.integer(line)
case '+':
return rp.simpleString(line)
case '$':
return rp.bulkString(line)
case '*':
return rp.array(line)
}
return true
}
func (rp *RedisParser) simpleString(line []byte) bool {
return rp.consumeSimpleString(string(line[1:]))
}
func (rp *RedisParser) integer(line []byte) bool {
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
return rp.consumeInteger(i)
}
func (rp *RedisParser) bulkString(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
switch {
case i < 0:
return rp.consumeBulkString(nil)
case i == 0:
return rp.consumeBulkString([]byte{})
default:
if i > rp.maxBulkStringSize { // prevent denial of service.
return rp.consumeError(
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
}
data := make([]byte, i)
for rest := i; rest > 0; {
var n int
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
return rp.consumeError(err)
}
rest -= int64(n)
}
if _, err = rp.reader.ReadBytes('\n'); err != nil {
return rp.consumeError(err)
}
return rp.consumeBulkString(data)
}
}
func (rp *RedisParser) array(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
return rp.consumeArray(i)
}
func (rp *RedisParser) push(i interface{}) bool {
rp.args = append(rp.args, i)
rp.missing--
if rp.missing <= 0 {
rp.missing = 0
res := rp.execute()
rp.args = []interface{}{}
return res
}
return true
}
func asString(i interface{}) string {
switch v := i.(type) {
case string:
return v
case []byte:
return string(v)
}
return fmt.Sprintf("%s", i)
}
func (rp *RedisParser) execute() bool {
l := len(rp.args)
if l < 1 {
log.Println("WARN: Too less argument for command.")
return false
}
cmd := strings.ToUpper(asString(rp.args[0]))
switch cmd {
case "HDEL":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HDEL data are not byte slices.")
return false
}
return rp.commands.Hdel(hash, key)
case "HGET":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HGET data are not byte slices.")
return false
}
return rp.commands.Hget(hash, key)
case "HSET":
if l < 4 {
log.Println("WARN: Missing argments for HSET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
value, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSET data are not byte slices.")
return false
}
return rp.commands.Hset(hash, key, value)
case "MULTI":
return rp.commands.Multi()
case "EXEC":
return rp.commands.Exec()
case "HKEYS":
if l < 2 {
log.Println("WARN: Missing argments for HKEYS.")
return false
}
hash, ok := rp.args[1].([]byte)
if !ok {
log.Println("WARN: HKEYS data are not byte slices.")
return false
}
return rp.commands.Hkeys(hash)
case "HSPATIAL":
if l < 4 {
log.Println("WARN: Missing argments for HSPATIAL.")
return false
}
hash, ok1 := rp.args[1].([]byte)
first, ok2 := rp.args[2].([]byte)
second, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSPATIAL data are not byte slices.")
return false
}
return rp.commands.HSpatial(hash, first, second)
case "PING":
return rp.commands.Ping()
}
log.Printf("WARN: unknown command: '%s'\n", cmd)
return false
}
func (rp *RedisParser) consumeSimpleString(s string) bool {
return rp.push(s)
}
func (rp *RedisParser) consumeBulkString(data []byte) bool {
return rp.push(data)
}
func (rp *RedisParser) consumeInteger(i int64) bool {
return rp.push(i)
}
func (rp *RedisParser) consumeError(err error) bool {
log.Printf("error: %s\n", err)
return true
}
func (rp *RedisParser) consumeArray(i int64) bool {
if rp.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false
}
if i < 0 {
log.Println("Null arrays are not supported")
return false
}
rp.missing = i
return true
}

543
cmd/mtredisalize/sqlite.go Normal file
View File

@ -0,0 +1,543 @@
// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"database/sql"
"log"
"sync"
_ "github.com/mattn/go-sqlite3"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
)
var globalLock sync.RWMutex
const (
deleteSQL = "DELETE FROM blocks WHERE pos = ?"
fetchSQL = "SELECT data FROM blocks WHERE pos = ?"
existsSQL = "SELECT 1 FROM blocks WHERE pos = ?"
updateSQL = "UPDATE blocks SET data = ? WHERE pos = ?"
insertSQL = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
countSQL = "SELECT count(*) FROM blocks"
keysSQL = "SELECT pos FROM blocks"
rangeSQL = "SELECT pos, data FROM blocks WHERE pos BETWEEN ? AND ? ORDER BY pos"
)
type SQLiteBackend struct {
db *sql.DB
encoder common.KeyEncoder
decoder common.KeyDecoder
changeTracker *changeTracker
interleaved bool
coverage *common.Coverage3D
existsStmt *sql.Stmt
deleteStmt *sql.Stmt
fetchStmt *sql.Stmt
insertStmt *sql.Stmt
updateStmt *sql.Stmt
countStmt *sql.Stmt
keysStmt *sql.Stmt
rangeStmt *sql.Stmt
}
type SQLiteSession struct {
backend *SQLiteBackend
tx *sql.Tx
}
func (sqlb *SQLiteBackend) NewSession() (Session, error) {
return &SQLiteSession{sqlb, nil}, nil
}
func (ss *SQLiteSession) Close() error {
t := ss.tx
if t != nil {
ss.tx = nil
return t.Rollback()
}
return nil
}
func NewSQLiteBackend(
path string,
changeTracker *changeTracker, interleaved bool) (sqlb *SQLiteBackend, err error) {
res := SQLiteBackend{interleaved: interleaved, changeTracker: changeTracker}
if res.db, err = sql.Open("sqlite3", path); err != nil {
return
}
if res.existsStmt, err = res.db.Prepare(existsSQL); err != nil {
res.closeAll()
return
}
if res.fetchStmt, err = res.db.Prepare(fetchSQL); err != nil {
res.closeAll()
return
}
if res.deleteStmt, err = res.db.Prepare(deleteSQL); err != nil {
res.closeAll()
return
}
if res.insertStmt, err = res.db.Prepare(insertSQL); err != nil {
res.closeAll()
return
}
if res.updateStmt, err = res.db.Prepare(updateSQL); err != nil {
res.closeAll()
return
}
if res.countStmt, err = res.db.Prepare(countSQL); err != nil {
res.closeAll()
return
}
if res.keysStmt, err = res.db.Prepare(keysSQL); err != nil {
res.closeAll()
return
}
if res.rangeStmt, err = res.db.Prepare(rangeSQL); err != nil {
res.closeAll()
return
}
if interleaved {
res.encoder = common.EncodeStringToBytesFromInterleaved
res.decoder = common.DecodeStringFromBytesToInterleaved
} else {
res.encoder = common.EncodeStringToBytes
res.decoder = common.DecodeStringFromBytes
}
if !interleaved {
if err = res.buildCoverage(); err != nil {
return
}
}
sqlb = &res
return
}
func (sqlb *SQLiteBackend) buildCoverage() (err error) {
log.Println("INFO: Start building coverage index (this may take some time)...")
sqlb.coverage = common.NewCoverage3D()
var rows *sql.Rows
if rows, err = sqlb.keysStmt.Query(); err != nil {
return
}
defer rows.Close()
for rows.Next() {
var key int64
if err = rows.Scan(&key); err != nil {
return
}
sqlb.coverage.Insert(common.PlainToCoord(key))
}
err = rows.Err()
log.Println("INFO: Finished building coverage index.")
return
}
func closeStmt(stmt **sql.Stmt) error {
s := *stmt
if s != nil {
*stmt = nil
return s.Close()
}
return nil
}
func closeDB(db **sql.DB) error {
d := *db
if d != nil {
*db = nil
return d.Close()
}
return nil
}
func (sqlb *SQLiteBackend) closeAll() error {
closeStmt(&sqlb.deleteStmt)
closeStmt(&sqlb.fetchStmt)
closeStmt(&sqlb.insertStmt)
closeStmt(&sqlb.updateStmt)
closeStmt(&sqlb.existsStmt)
closeStmt(&sqlb.countStmt)
closeStmt(&sqlb.keysStmt)
closeStmt(&sqlb.rangeStmt)
return closeDB(&sqlb.db)
}
func (sqlb *SQLiteBackend) Shutdown() error {
globalLock.Lock()
defer globalLock.Unlock()
return sqlb.closeAll()
}
func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
if ss.tx != nil {
return ss.tx.Stmt(stmt)
}
return stmt
}
func (ss *SQLiteSession) Del(hash, key []byte) (success bool, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := ss.txStmt(ss.backend.existsStmt)
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
success = false
return
}
if err2 != nil {
err = err2
return
}
success = true
deleteStmt := ss.txStmt(ss.backend.deleteStmt)
_, err = deleteStmt.Exec(pos)
return
}
func (ss *SQLiteSession) Fetch(hash, key []byte) (data []byte, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.RLock()
defer globalLock.RUnlock()
fetchStmt := ss.txStmt(ss.backend.fetchStmt)
err2 := fetchStmt.QueryRow(pos).Scan(&data)
if err2 == sql.ErrNoRows {
return
}
err = err2
return
}
func (ss *SQLiteSession) InTransaction() bool {
return ss.tx != nil
}
func (ss *SQLiteSession) Store(hash, key, value []byte) (exists bool, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := ss.txStmt(ss.backend.existsStmt)
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
exists = false
} else if err2 != nil {
err = err2
return
} else {
exists = true
}
if exists {
updateStmt := ss.txStmt(ss.backend.updateStmt)
_, err = updateStmt.Exec(value, pos)
} else {
insertStmt := ss.txStmt(ss.backend.insertStmt)
_, err = insertStmt.Exec(pos, value)
}
if err != nil {
return
}
// This technically too early because this is done in a transactions
// which are commited (and possible fail) later.
if ss.backend.changeTracker != nil || ss.backend.coverage != nil {
c := common.PlainToCoord(pos)
if ss.backend.coverage != nil && !exists {
ss.backend.coverage.Insert(c)
}
if ss.backend.changeTracker != nil {
ss.backend.changeTracker.BlockChanged(c)
}
}
return
}
func (ss *SQLiteSession) BeginTransaction() (err error) {
if ss.tx != nil {
log.Println("WARN: Already running transaction.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx, err = ss.backend.db.Begin()
return
}
func (ss *SQLiteSession) CommitTransaction() error {
tx := ss.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx = nil
return tx.Commit()
}
func (ss *SQLiteSession) AllKeys(
hash []byte,
done <-chan struct{}) (<-chan []byte, int, error) {
globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt)
var n int
var err error
if err = countStmt.QueryRow().Scan(&n); err != nil {
if err == sql.ErrNoRows {
err = nil
}
globalLock.RUnlock()
return nil, n, err
}
keysStmt := ss.txStmt(ss.backend.keysStmt)
var rows *sql.Rows
if rows, err = keysStmt.Query(); err != nil {
globalLock.RUnlock()
return nil, n, err
}
keys := make(chan []byte)
go func() {
defer globalLock.RUnlock()
defer rows.Close()
defer close(keys)
var err error
for rows.Next() {
var key int64
if err = rows.Scan(&key); err != nil {
log.Printf("WARN: %s\n", err)
break
}
var encoded []byte
if encoded, err = ss.backend.encoder(key); err != nil {
log.Printf("Cannot encode key: %d %s\n", key, err)
break
}
select {
case keys <- encoded:
case <-done:
return
}
}
}()
return keys, n, nil
}
func (ss *SQLiteSession) SpatialQuery(
hash, first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
if ss.backend.interleaved {
return ss.interleavedSpatialQuery(first, second, done)
}
return ss.plainSpatialQuery(first, second, done)
}
func (ss *SQLiteSession) interleavedSpatialQuery(
first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return nil, err
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block)
globalLock.RLock()
go func() {
defer close(blocks)
defer globalLock.RUnlock()
zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2)
// Should not be necessary.
zmin, zmax = common.Order64(zmin, zmax)
cub := common.Cuboid{P1: c1, P2: c2}
rangeStmt := ss.txStmt(ss.backend.rangeStmt)
zcode := zmin
loop:
rows, err := rangeStmt.Query(zcode, zmax)
if err != nil {
log.Printf("error: fetching range failed: %s\n", err)
return
}
for rows.Next() {
var data []byte
if err = rows.Scan(&zcode, &data); err != nil {
rows.Close()
log.Printf("error: scanning row failed: %s\n", err)
return
}
c := common.InterleavedToCoord(zcode)
if cub.Contains(c) {
key := common.StringToBytes(common.CoordToPlain(c))
//fmt.Printf("sending: %q\n", c)
select {
case blocks <- Block{Key: key, Data: data}:
case <-done:
return
}
} else {
if err = rows.Close(); err != nil {
log.Printf("error: closing range failed: %s\n", err)
return
}
zcode = common.BigMin(zmin, zmax, zcode)
goto loop
}
}
if err = rows.Err(); err != nil {
log.Printf("error: iterating range failed: %s\n", err)
}
if err = rows.Close(); err != nil {
log.Printf("error: closing range failed: %s\n", err)
}
}()
return blocks, nil
}
func (ss *SQLiteSession) plainSpatialQuery(
first, second []byte,
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return nil, err
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block)
globalLock.RLock()
go func() {
defer globalLock.RUnlock()
defer close(blocks)
rangeStmt := ss.txStmt(ss.backend.rangeStmt)
send := func(rows *sql.Rows, err error) bool {
if err != nil {
log.Printf("Error in range query: %s\n", err)
return false
}
defer rows.Close()
for rows.Next() {
var key int64
var data []byte
if err = rows.Scan(&key, &data); err != nil {
log.Printf("Error in scanning row: %s\n", err)
return false
}
var encodedKey []byte
if encodedKey, err = common.EncodeStringToBytes(key); err != nil {
log.Printf("Key encoding failed: %s\n", err)
return false
}
select {
case blocks <- Block{Key: encodedKey, Data: data}:
case <-done:
return false
}
}
if err = rows.Err(); err != nil {
log.Printf("Error in range query: %s\n", err)
return false
}
return true
}
var a, b common.Coord
for _, r := range ss.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
a.X, b.X = int16(r.X1), int16(r.X2)
// log.Printf("y1 y2 x1 x2 z: %d %d, %d %d, %d\n", r.Y1, r.Y2, r.X1, r.X2, r.Z)
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
b.Y = a.Y
from, to := common.CoordToPlain(a), common.CoordToPlain(b)
if !send(rangeStmt.Query(from, to)) {
return
}
}
}
}()
return blocks, nil
}