Moved sub programs into folder cmd to clean up project structure.

This commit is contained in:
Sascha L. Teichmann
2014-09-13 19:07:20 +02:00
parent b445cfb33a
commit fd64d20b76
16 changed files with 4 additions and 2 deletions

View File

@ -0,0 +1,32 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import "errors"
var ErrNotImplemented = errors.New("Not implemented")
type (
Block struct {
Key []byte
Data []byte
}
Session interface {
Fetch(hash, key []byte) ([]byte, error)
InTransaction() bool
Store(hash, key, value []byte) (bool, error)
AllKeys(hash []byte, done chan struct{}) (chan []byte, int, error)
SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error)
BeginTransaction() error
CommitTransaction() error
Close() error
}
Backend interface {
NewSession() (Session, error)
Shutdown() error
}
)

View File

@ -0,0 +1,246 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"fmt"
"log"
"net"
)
var (
redisOk = []byte("+OK\r\n")
redisError = []byte("-ERR\r\n")
redisNoSuchBlock = []byte("$-1\r\n")
redisCrnl = []byte("\r\n")
redisEmptyArray = []byte("*0\r\n")
redisQueued = []byte("+QUEUED\r\n")
)
type Connection struct {
conn net.Conn
session Session
boolArray []bool
}
func NewConnection(conn net.Conn, session Session) *Connection {
return &Connection{
conn: conn,
session: session,
boolArray: []bool{}}
}
func (c *Connection) Run() {
defer func() {
c.session.Close()
c.conn.Close()
}()
rce := NewRedisCommandExecutor(c)
r := bufio.NewReaderSize(c.conn, 8*1024)
parser := NewRedisParser(r, rce)
parser.Parse()
log.Println("client disconnected")
}
func logError(err error) {
log.Printf("ERROR: %s", err)
}
func (c *Connection) Hget(hash, key []byte) bool {
var err error
var data []byte
if data, err = c.session.Fetch(hash, key); err != nil {
return c.writeError(err)
}
return c.writeBlock(data)
}
func (c *Connection) Hset(hash, key, data []byte) bool {
var err error
var exists bool
if exists, err = c.session.Store(hash, key, data); err != nil {
return c.writeError(err)
}
if c.session.InTransaction() {
c.boolArray = append(c.boolArray, exists)
return c.writeQueued()
}
return c.writeBool(exists)
}
func (c *Connection) Multi() bool {
if c.session.InTransaction() {
log.Println("WARN: Already running transaction.")
} else {
if err := c.session.BeginTransaction(); err != nil {
return c.writeError(err)
}
}
return c.writeOk()
}
func (c *Connection) Exec() bool {
if !c.session.InTransaction() {
return c.writeEmptyArray()
}
arr := c.boolArray
c.boolArray = []bool{}
if err := c.session.CommitTransaction(); err != nil {
return c.writeError(err)
}
return c.writeBoolArray(arr)
}
func (c *Connection) Hkeys(hash []byte) bool {
var (
err error
n int
keys chan []byte
done = make(chan struct{})
)
defer close(done)
if keys, n, err = c.session.AllKeys(hash, done); err != nil {
return c.writeError(err)
}
if n == 0 {
return c.writeEmptyArray()
}
if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", n))); err != nil {
logError(err)
return false
}
for key := range keys {
if err = c.writeBulkString(key); err != nil {
logError(err)
return false
}
}
return true
}
func (c *Connection) HSpatial(hash, first, second []byte) bool {
var (
err error
blocks chan Block
done = make(chan struct{})
)
defer close(done)
if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil {
return c.writeError(err)
}
for block := range blocks {
if err = c.writeBulkString(block.Key); err != nil {
logError(err)
return false
}
if err = c.writeBulkString(block.Data); err != nil {
logError(err)
return false
}
}
if err = c.writeBulkString(nil); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeError(err error) bool {
logError(err)
if _, err = c.conn.Write(redisError); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeEmptyArray() bool {
if _, err := c.conn.Write(redisEmptyArray); err != nil {
logError(err)
return false
}
return true
}
func asInt(b bool) int {
if b {
return 1
}
return 0
}
func (c *Connection) writeBool(b bool) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", asInt(b)))); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeBoolArray(arr []bool) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil {
logError(err)
return false
}
for _, b := range arr {
if !c.writeBool(b) {
return false
}
}
return true
}
func (c *Connection) writeOk() bool {
if _, err := c.conn.Write(redisOk); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeQueued() bool {
if _, err := c.conn.Write(redisQueued); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeBlock(data []byte) bool {
if err := c.writeBulkString(data); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeBulkString(data []byte) (err error) {
con := c.conn
if data == nil {
_, err = con.Write(redisNoSuchBlock)
} else {
if _, err = con.Write([]byte(fmt.Sprintf("$%d\r\n", len(data)))); err != nil {
return
}
if _, err = con.Write(data); err != nil {
return
}
_, err = con.Write(redisCrnl)
}
return
}

373
cmd/mtredisalize/leveldb.go Normal file
View File

@ -0,0 +1,373 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"log"
"sync"
"bitbucket.org/s_l_teichmann/mtredisalize/common"
leveldb "github.com/jmhodges/levigo"
)
type LevelDBBackend struct {
cache *leveldb.Cache
db *leveldb.DB
interleaved bool
encoder common.KeyTranscoder
decoder common.KeyTranscoder
mutex sync.RWMutex
}
type LevelDBSession struct {
backend *LevelDBBackend
tx *leveldb.WriteBatch
}
func NewLeveDBBackend(
path string,
interleaved bool,
cacheSize int) (ldb *LevelDBBackend, err error) {
opts := leveldb.NewOptions()
var cache *leveldb.Cache
if cacheSize > 0 {
cache = leveldb.NewLRUCache(cacheSize * 1024 * 1024)
opts.SetCache(cache)
}
opts.SetCreateIfMissing(true)
var db *leveldb.DB
if db, err = leveldb.Open(path, opts); err != nil {
if cache != nil {
cache.Close()
}
return
}
var (
encoder common.KeyTranscoder
decoder common.KeyTranscoder
)
if interleaved {
encoder = common.TranscodeInterleavedToPlain
decoder = common.TranscodePlainToInterleaved
} else {
encoder = common.IdentityTranscoder
decoder = common.IdentityTranscoder
}
ldb = &LevelDBBackend{
cache: cache,
db: db,
interleaved: interleaved,
encoder: encoder,
decoder: decoder}
return
}
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock()
f(ldb.db)
ldb.mutex.RUnlock()
}
func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
ldb.mutex.Lock()
f(ldb.db)
ldb.mutex.Unlock()
}
func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil
}
func (ldbs *LevelDBSession) Close() error {
if ldbs.tx != nil {
ldbs.tx.Close()
}
return nil
}
func (ldb *LevelDBBackend) Shutdown() error {
ldb.db.Close()
if ldb.cache != nil {
ldb.cache.Close()
}
return nil
}
func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doRead(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
//if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
})
return
}
func (ldbs *LevelDBSession) InTransaction() bool {
return ldbs.tx != nil
}
func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = db.Get(ro, key); err != nil {
return
}
exists = data != nil
return
}
func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doWrite(func(db *leveldb.DB) {
if exists, err = keyExists(db, key); err != nil {
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
return
}
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
})
return
}
func (ldbs *LevelDBSession) BeginTransaction() error {
ldbs.tx = leveldb.NewWriteBatch()
return nil
}
func (ldbs *LevelDBSession) CommitTransaction() (err error) {
tx := ldbs.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return
}
ldbs.tx = nil
ldbs.backend.doWrite(func(db *leveldb.DB) {
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = db.Write(wo, tx)
wo.Close()
tx.Close()
})
return
}
func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) {
ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
it.SeekToFirst()
for ; it.Valid(); it.Next() {
n++
}
if err = it.GetError(); err != nil {
it.Close()
ro.Close()
ldbs.backend.mutex.RUnlock()
return
}
keys = make(chan []byte)
go func() {
ldbs.backend.mutex.RUnlock()
defer ro.Close()
defer close(keys)
defer it.Close()
it.SeekToFirst()
encoder := ldbs.backend.encoder
for ; it.Valid(); it.Next() {
if key, err := encoder(it.Key()); err == nil {
select {
case keys <- key:
case <-done:
return
}
} else {
log.Printf("WARN: %s\n", err)
return
}
}
if err := it.GetError(); err != nil {
log.Printf("WARN: %s\n", err)
}
}()
return
}
func (ldbs *LevelDBSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) {
if ldbs.backend.interleaved {
return ldbs.interleavedSpatialQuery(first, second, done)
}
return ldbs.plainSpatialQuery(first, second, done)
}
func (ldbs *LevelDBSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) {
var (
firstKey int64
secondKey int64
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
go func() {
defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
defer it.Close()
a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X}
var err error
for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ {
b.Z = a.Z
for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ {
b.Y = a.Y
from, to := order(common.CoordToPlain(a), common.CoordToPlain(b))
it.Seek(common.StringToBytes(from))
for ; it.Valid(); it.Next() {
var (
key = it.Key()
pos int64
)
if pos, err = common.DecodeStringFromBytes(key); err != nil {
log.Printf("decoding key failed: %s", err)
return
}
if pos > to {
break
}
select {
case blocks <- Block{Key: key, Data: it.Value()}:
case <-done:
return
}
}
if err = it.GetError(); err != nil {
log.Printf("iterating failed: %s", err)
return
}
}
}
}()
return
}
func (ldbs *LevelDBSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) {
var (
firstKey int64
secondKey int64
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
go func() {
defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
defer it.Close()
zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2)
// Should not be necessary.
zmin, zmax = order(zmin, zmax)
var (
cub = common.Cuboid{P1: c1, P2: c2}
err error
encodedKey []byte
)
//log.Printf("seeking to: %d", zmin)
it.Seek(common.ToBigEndian(zmin))
for it.Valid() {
zcode := common.FromBigEndian(it.Key())
if zcode > zmax {
break
}
if c := common.InterleavedToCoord(zcode); cub.Contains(c) {
if encodedKey, err = common.EncodeStringToBytes(common.CoordToPlain(c)); err != nil {
log.Printf("error encoding key: %s", err)
return
}
select {
case blocks <- Block{Key: encodedKey, Data: it.Value()}:
case <-done:
return
}
it.Next()
} else {
next := common.BigMin(zmin, zmax, zcode)
//log.Printf("seeking to: %d", next)
it.Seek(common.ToBigEndian(next))
//log.Printf("seeking done: %d", next)
}
}
//log.Println("iterating done")
if err = it.GetError(); err != nil {
log.Printf("error while iterating: %s", err)
return
}
}()
return
}

132
cmd/mtredisalize/main.go Normal file
View File

@ -0,0 +1,132 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"runtime"
"time"
)
const (
Version = "0.3"
GCDuration = "24h"
)
func usage() {
fmt.Fprintf(os.Stderr,
"Usage: %s [<options>] <database>\n", os.Args[0])
fmt.Fprintln(os.Stderr, "Options:")
flag.PrintDefaults()
}
func main() {
var (
port int
host string
driver string
cacheSize int
version bool
interleaved bool
gcDuration string
)
flag.Usage = usage
flag.IntVar(&port, "port", 6379, "port to bind")
flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)")
flag.StringVar(&host, "host", "", "host to bind")
flag.IntVar(&cacheSize, "cache", 32, "cache size in MB")
flag.BoolVar(&version, "version", false, "Print version and exit.")
flag.BoolVar(&interleaved, "interleaved", false, "Backend stores key in interleaved form.")
flag.StringVar(&gcDuration, "gc-duration", GCDuration, "Duration between forced GCs.")
flag.Parse()
if version {
fmt.Printf("Version: %s\n", Version)
os.Exit(0)
}
if flag.NArg() < 1 {
log.Fatal("Missing path to world")
}
var (
err error
backend Backend
gcDur time.Duration
)
if gcDur, err = time.ParseDuration(gcDuration); err != nil {
log.Fatal(err)
}
path := flag.Arg(0)
if driver == "sqlite" {
if backend, err = NewSqliteBackend(path, interleaved); err != nil {
log.Fatal(err)
}
} else {
if backend, err = NewLeveDBBackend(path, interleaved, cacheSize); err != nil {
log.Fatal(err)
}
}
defer backend.Shutdown()
var listener net.Listener
listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
log.Fatal(err)
}
defer listener.Close()
log.Printf("Server started at %s", listener.Addr())
connChan := make(chan net.Conn)
defer close(connChan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, os.Kill)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
log.Printf("Client accepted from: %s", conn.RemoteAddr())
connChan <- conn
}
}()
log.Printf("Doing garbage collection every: %s", gcDur)
gcChan := time.Tick(gcDur)
for {
select {
case conn := <-connChan:
var session Session
if session, err = backend.NewSession(); err != nil {
log.Printf("Cannot create session: %s", err)
conn.Close()
} else {
go NewConnection(conn, session).Run()
}
case <-sigChan:
log.Println("Shutting down")
return
case <-gcChan:
log.Println("Starting garbage collection.")
runtime.GC()
log.Println("Garbage collection done.")
}
}
}

12
cmd/mtredisalize/misc.go Normal file
View File

@ -0,0 +1,12 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
func order(a, b int64) (int64, int64) {
if a < b {
return a, b
}
return b, a
}

269
cmd/mtredisalize/parser.go Normal file
View File

@ -0,0 +1,269 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"strconv"
)
type RedisConsumer interface {
ConsumeInteger(int64) bool
ConsumeArray(int64) bool
ConsumeSimpleString(string) bool
ConsumeBulkString([]byte) bool
ConsumeError(error) bool
}
type RedisParser struct {
reader *bufio.Reader
consumer RedisConsumer
}
func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer) *RedisParser {
return &RedisParser{
reader: reader,
consumer: consumer}
}
func (rp *RedisParser) Parse() {
for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() {
}
}
func (rp *RedisParser) nextLine() []byte {
line, err := rp.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
rp.consumer.ConsumeError(err)
}
return nil
}
return bytes.TrimRight(line, "\r\n")
}
func (rp *RedisParser) dispatch(line []byte) bool {
if len(line) < 1 {
return false
}
switch line[0] {
case '-':
return true // ignore errors
case ':':
return rp.integer(line)
case '+':
return rp.simpleString(line)
case '$':
return rp.bulkString(line)
case '*':
return rp.array(line)
}
return true
}
func (rp *RedisParser) simpleString(line []byte) bool {
return rp.consumer.ConsumeSimpleString(string(line[1:]))
}
func (rp *RedisParser) integer(line []byte) bool {
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeInteger(i)
}
func (rp *RedisParser) bulkString(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
switch {
case i < 0:
return rp.consumer.ConsumeBulkString(nil)
case i == 0:
return rp.consumer.ConsumeBulkString([]byte{})
default:
if i >= 4*1024*1024 { // 4MB should be sufficient
return rp.consumer.ConsumeError(
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
}
data := make([]byte, i)
for rest := i; rest > 0; {
var n int
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
return rp.consumer.ConsumeError(err)
}
rest -= int64(n)
}
if _, err = rp.reader.ReadBytes('\n'); err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeBulkString(data)
}
}
func (rp *RedisParser) array(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeArray(i)
}
type RedisCommands interface {
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
HSpatial(hash, first, second []byte) bool
}
type RedisCommandExecutor struct {
commands RedisCommands
missing int64
args []interface{}
}
func NewRedisCommandExecutor(commands RedisCommands) *RedisCommandExecutor {
return &RedisCommandExecutor{
commands: commands,
missing: 0,
args: []interface{}{}}
}
func (rce *RedisCommandExecutor) push(i interface{}) bool {
rce.args = append(rce.args, i)
rce.missing--
if rce.missing <= 0 {
rce.missing = 0
res := rce.execute()
rce.args = []interface{}{}
return res
}
return true
}
func asString(i interface{}) string {
switch v := i.(type) {
case string:
return v
case []byte:
return string(v)
}
return fmt.Sprintf("%s", i)
}
func (rce *RedisCommandExecutor) execute() bool {
l := len(rce.args)
if l < 1 {
log.Printf("WARN: Too less argument for command.")
return false
}
cmd := asString(rce.args[0])
switch cmd {
case "HGET":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HGET data are not byte slices.")
return false
}
return rce.commands.Hget(hash, key)
case "HSET":
if l < 4 {
log.Println("WARN: Missing argments for HSET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
value, ok3 := rce.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSET data are not byte slices.")
return false
}
return rce.commands.Hset(hash, key, value)
case "MULTI":
return rce.commands.Multi()
case "EXEC":
return rce.commands.Exec()
case "HKEYS":
if l < 2 {
log.Println("WARN: Missing argments for HKEYS.")
return false
}
hash, ok := rce.args[1].([]byte)
if !ok {
log.Println("WARN: HKEYS data are not byte slices.")
return false
}
return rce.commands.Hkeys(hash)
case "HSPATIAL":
if l < 4 {
log.Println("WARN: Missing argments for HSPATIAL.")
return false
}
hash, ok1 := rce.args[1].([]byte)
first, ok2 := rce.args[2].([]byte)
second, ok3 := rce.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSPATIAL data are not byte slices.")
return false
}
return rce.commands.HSpatial(hash, first, second)
}
log.Printf("WARN: unknown command: '%s'\n", cmd)
return false
}
func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool {
return rce.push(s)
}
func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool {
return rce.push(data)
}
func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool {
return rce.push(i)
}
func (rce *RedisCommandExecutor) ConsumeError(err error) bool {
log.Printf("error: %s", err)
return true
}
func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
if rce.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false
}
if i < 0 {
log.Println("Null arrays are not supported")
return false
}
rce.missing = i
return true
}

439
cmd/mtredisalize/sqlite.go Normal file
View File

@ -0,0 +1,439 @@
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"database/sql"
"log"
"sync"
_ "github.com/mattn/go-sqlite3"
"bitbucket.org/s_l_teichmann/mtredisalize/common"
)
var globalLock sync.RWMutex
const (
fetchSql = "SELECT data FROM blocks WHERE pos = ?"
existsSql = "SELECT 1 FROM blocks WHERE pos = ?"
updateSql = "UPDATE blocks SET data = ? WHERE pos = ?"
insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
countSql = "SELECT count(*) FROM blocks"
keysSql = "SELECT pos FROM blocks"
rangeSql = "SELECT pos, data FROM blocks WHERE pos BETWEEN ? AND ? ORDER BY pos"
)
type SqliteBackend struct {
db *sql.DB
encoder common.KeyEncoder
decoder common.KeyDecoder
interleaved bool
existsStmt *sql.Stmt
fetchStmt *sql.Stmt
insertStmt *sql.Stmt
updateStmt *sql.Stmt
countStmt *sql.Stmt
keysStmt *sql.Stmt
rangeStmt *sql.Stmt
}
type SqliteSession struct {
backend *SqliteBackend
tx *sql.Tx
}
func (ss *SqliteBackend) NewSession() (Session, error) {
return &SqliteSession{ss, nil}, nil
}
func (ss *SqliteSession) Close() error {
t := ss.tx
if t != nil {
ss.tx = nil
return t.Rollback()
}
return nil
}
func NewSqliteBackend(path string, interleaved bool) (sqlb *SqliteBackend, err error) {
res := SqliteBackend{interleaved: interleaved}
if res.db, err = sql.Open("sqlite3", path); err != nil {
return
}
if res.existsStmt, err = res.db.Prepare(existsSql); err != nil {
res.closeAll()
return
}
if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil {
res.closeAll()
return
}
if res.insertStmt, err = res.db.Prepare(insertSql); err != nil {
res.closeAll()
return
}
if res.updateStmt, err = res.db.Prepare(updateSql); err != nil {
res.closeAll()
return
}
if res.countStmt, err = res.db.Prepare(countSql); err != nil {
res.closeAll()
return
}
if res.keysStmt, err = res.db.Prepare(keysSql); err != nil {
res.closeAll()
return
}
if res.rangeStmt, err = res.db.Prepare(rangeSql); err != nil {
res.closeAll()
return
}
if interleaved {
res.encoder = common.EncodeStringToBytesFromInterleaved
res.decoder = common.DecodeStringFromBytesToInterleaved
} else {
res.encoder = common.EncodeStringToBytes
res.decoder = common.DecodeStringFromBytes
}
sqlb = &res
return
}
func closeStmt(stmt **sql.Stmt) error {
s := *stmt
if s != nil {
*stmt = nil
return s.Close()
}
return nil
}
func closeDB(db **sql.DB) error {
d := *db
if d != nil {
*db = nil
return d.Close()
}
return nil
}
func (sqlb *SqliteBackend) closeAll() error {
closeStmt(&sqlb.fetchStmt)
closeStmt(&sqlb.insertStmt)
closeStmt(&sqlb.updateStmt)
closeStmt(&sqlb.existsStmt)
closeStmt(&sqlb.countStmt)
closeStmt(&sqlb.keysStmt)
closeStmt(&sqlb.rangeStmt)
return closeDB(&sqlb.db)
}
func (sqlb *SqliteBackend) Shutdown() error {
globalLock.Lock()
defer globalLock.Unlock()
return sqlb.closeAll()
}
func (ss *SqliteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
if ss.tx != nil {
return ss.tx.Stmt(stmt)
}
return stmt
}
func (ss *SqliteSession) Fetch(hash, key []byte) (data []byte, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.RLock()
defer globalLock.RUnlock()
fetchStmt := ss.txStmt(ss.backend.fetchStmt)
err2 := fetchStmt.QueryRow(pos).Scan(&data)
if err2 == sql.ErrNoRows {
return
}
err = err2
return
}
func (ss *SqliteSession) InTransaction() bool {
return ss.tx != nil
}
func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := ss.txStmt(ss.backend.existsStmt)
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
exists = false
} else if err2 != nil {
err = err2
return
} else {
exists = true
}
if exists {
updateStmt := ss.txStmt(ss.backend.updateStmt)
_, err = updateStmt.Exec(value, pos)
} else {
insertStmt := ss.txStmt(ss.backend.insertStmt)
_, err = insertStmt.Exec(pos, value)
}
return
}
func (ss *SqliteSession) BeginTransaction() (err error) {
if ss.tx != nil {
log.Println("WARN: Already running transaction.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx, err = ss.backend.db.Begin()
return
}
func (ss *SqliteSession) CommitTransaction() error {
tx := ss.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx = nil
return tx.Commit()
}
func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) {
globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt)
if err = countStmt.QueryRow().Scan(&n); err != nil {
if err == sql.ErrNoRows {
err = nil
}
globalLock.RUnlock()
return
}
keysStmt := ss.txStmt(ss.backend.keysStmt)
var rows *sql.Rows
if rows, err = keysStmt.Query(); err != nil {
globalLock.RUnlock()
return
}
keys = make(chan []byte)
go func() {
defer globalLock.RUnlock()
defer rows.Close()
defer close(keys)
var err error
for rows.Next() {
var key int64
if err := rows.Scan(&key); err != nil {
log.Printf("WARN: %s", err)
break
}
var encoded []byte
if encoded, err = ss.backend.encoder(key); err != nil {
log.Printf("Cannot encode key: %d %s\n", key, err)
break
}
select {
case keys <- encoded:
case <-done:
return
}
}
}()
return
}
func (ss *SqliteSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) {
if ss.backend.interleaved {
return ss.interleavedSpatialQuery(first, second, done)
}
return ss.plainSpatialQuery(first, second, done)
}
func (ss *SqliteSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) {
var (
firstKey int64
secondKey int64
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
globalLock.RLock()
go func() {
defer close(blocks)
defer globalLock.RUnlock()
zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2)
// Should not be necessary.
zmin, zmax = order(zmin, zmax)
cub := common.Cuboid{P1: c1, P2: c2}
rangeStmt := ss.txStmt(ss.backend.rangeStmt)
OUTER:
for {
var (
err error
rows *sql.Rows
)
if rows, err = rangeStmt.Query(zmin, zmax); err != nil {
log.Printf("Error in range query: %s", err)
return
}
for rows.Next() {
var zcode int64
var data []byte
if err = rows.Scan(&zcode, &data); err != nil {
log.Printf("Error in scanning row: %s", err)
rows.Close()
return
}
c := common.InterleavedToCoord(zcode)
if cub.Contains(c) {
var encodedKey []byte
if encodedKey, err = common.EncodeStringToBytes(common.CoordToPlain(c)); err != nil {
log.Printf("Key encoding failed: %s", err)
rows.Close()
return
}
select {
case blocks <- Block{Key: encodedKey, Data: data}:
case <-done:
rows.Close()
return
}
} else { // Left the cuboid
rows.Close()
zmin = common.BigMin(zmin, zmax, zcode)
continue OUTER
}
}
if err = rows.Err(); err != nil {
log.Printf("Error in range query: %s", err)
}
rows.Close()
}
}()
return
}
func (ss *SqliteSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) {
var (
firstKey int64
secondKey int64
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
globalLock.RLock()
go func() {
defer globalLock.RUnlock()
defer close(blocks)
rangeStmt := ss.txStmt(ss.backend.rangeStmt)
a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X}
for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ {
b.Z = a.Z
for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ {
b.Y = a.Y
var (
err error
rows *sql.Rows
)
// Ordering should not be necessary.
from, to := order(common.CoordToPlain(a), common.CoordToPlain(b))
if rows, err = rangeStmt.Query(from, to); err != nil {
log.Printf("Error in range query: %s", err)
return
}
for rows.Next() {
var key int64
var data []byte
if err = rows.Scan(&key, &data); err != nil {
log.Printf("Error in scanning row: %s", err)
break
}
var encodedKey []byte
if encodedKey, err = common.EncodeStringToBytes(key); err != nil {
log.Printf("Key encoding failed: %s", err)
break
}
select {
case blocks <- Block{Key: encodedKey, Data: data}:
case <-done:
rows.Close()
return
}
}
if err = rows.Err(); err != nil {
log.Printf("Error in range query: %s", err)
}
rows.Close()
}
}
}()
return
}