mtredisalize: Add infrastructure to send backend store changes to separate daemons.

This commit is contained in:
Sascha L. Teichmann 2014-09-17 09:51:34 +02:00
parent ef541f18f6
commit 400a65f563
5 changed files with 132 additions and 40 deletions

View File

@ -0,0 +1,33 @@
package main
import (
"log"
"sync"
"bitbucket.org/s_l_teichmann/mtredisalize/common"
)
type ChangeTracker struct {
mutex sync.Mutex
}
func NewChangeTracker() *ChangeTracker {
return &ChangeTracker{}
}
func (ct *ChangeTracker) BlockChanged(key []byte) {
ct.mutex.Lock()
defer ct.mutex.Unlock()
var err error
var coord common.Coord
if coord, err = common.DecodeStringBytesToCoord(key); err != nil {
log.Printf("decoding key failed: %s", err)
return
}
log.Printf("changed block: %v", coord)
}
func (ct *ChangeTracker) FlushChanges(url string) (err error) {
log.Println("change flush triggered")
return
}

View File

@ -14,12 +14,13 @@ import (
) )
type LevelDBBackend struct { type LevelDBBackend struct {
cache *leveldb.Cache cache *leveldb.Cache
db *leveldb.DB db *leveldb.DB
interleaved bool interleaved bool
encoder common.KeyTranscoder encoder common.KeyTranscoder
decoder common.KeyTranscoder decoder common.KeyTranscoder
mutex sync.RWMutex changeTracker *ChangeTracker
mutex sync.RWMutex
} }
type LevelDBSession struct { type LevelDBSession struct {
@ -29,6 +30,7 @@ type LevelDBSession struct {
func NewLeveDBBackend( func NewLeveDBBackend(
path string, path string,
changeTracker *ChangeTracker,
interleaved bool, interleaved bool,
cacheSize int) (ldb *LevelDBBackend, err error) { cacheSize int) (ldb *LevelDBBackend, err error) {
@ -63,11 +65,12 @@ func NewLeveDBBackend(
} }
ldb = &LevelDBBackend{ ldb = &LevelDBBackend{
cache: cache, cache: cache,
db: db, db: db,
interleaved: interleaved, interleaved: interleaved,
encoder: encoder, encoder: encoder,
decoder: decoder} decoder: decoder,
changeTracker: changeTracker}
return return
} }
@ -135,6 +138,7 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
} }
func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) {
origKey := key
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
@ -150,6 +154,11 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
err = db.Put(wo, key, value) err = db.Put(wo, key, value)
wo.Close() wo.Close()
}) })
// This technically too early because this done in transactions
// which are commited (and possible fail) later.
if ldbs.backend.changeTracker != nil {
ldbs.backend.changeTracker.BlockChanged(origKey)
}
return return
} }

View File

@ -16,8 +16,9 @@ import (
) )
const ( const (
Version = "0.3" Version = "0.3"
GCDuration = "24h" GCDuration = "24h"
ChangeDuration = "30s"
) )
func usage() { func usage() {
@ -30,13 +31,15 @@ func usage() {
func main() { func main() {
var ( var (
port int port int
host string host string
driver string driver string
cacheSize int cacheSize int
version bool version bool
interleaved bool interleaved bool
gcDuration string changeUrl string
gcDuration string
changeDuration string
) )
flag.Usage = usage flag.Usage = usage
@ -46,8 +49,13 @@ func main() {
flag.StringVar(&host, "host", "", "host to bind") flag.StringVar(&host, "host", "", "host to bind")
flag.IntVar(&cacheSize, "cache", 32, "cache size in MB") flag.IntVar(&cacheSize, "cache", 32, "cache size in MB")
flag.BoolVar(&version, "version", false, "Print version and exit.") flag.BoolVar(&version, "version", false, "Print version and exit.")
flag.BoolVar(&interleaved, "interleaved", false, "Backend stores key in interleaved form.") flag.BoolVar(&interleaved,
flag.StringVar(&gcDuration, "gc-duration", GCDuration, "Duration between forced GCs.") "interleaved", false, "Backend stores key in interleaved form.")
flag.StringVar(&gcDuration,
"gc-duration", GCDuration, "Duration between forced GCs.")
flag.StringVar(&changeDuration,
"change-duration", ChangeDuration, "Duration to aggregate changes.")
flag.StringVar(&changeUrl, "change-url", "", "URL to send changes to.")
flag.Parse() flag.Parse()
if version { if version {
@ -60,26 +68,47 @@ func main() {
} }
var ( var (
err error err error
backend Backend backend Backend
gcDur time.Duration gcDur time.Duration
chDur time.Duration
changeTracker *ChangeTracker
) )
if gcDur, err = time.ParseDuration(gcDuration); err != nil { if gcDur, err = time.ParseDuration(gcDuration); err != nil {
log.Fatal(err) log.Fatal(err)
} }
// Setup the change listening stuff.
var changeChan <-chan time.Time
useChangeNotification := changeUrl != ""
if useChangeNotification {
if chDur, err = time.ParseDuration(changeDuration); err != nil {
log.Fatal(err)
}
changeChan = time.Tick(chDur)
changeTracker = NewChangeTracker()
} else {
// We will never receive ticks on this.
changeChan = make(<-chan time.Time)
}
path := flag.Arg(0) path := flag.Arg(0)
if driver == "sqlite" { if driver == "sqlite" {
if backend, err = NewSqliteBackend(path, interleaved); err != nil { if backend, err = NewSqliteBackend(path, changeTracker, interleaved); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} else { } else {
if backend, err = NewLeveDBBackend(path, interleaved, cacheSize); err != nil { if backend, err = NewLeveDBBackend(
path, changeTracker, interleaved, cacheSize); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
defer backend.Shutdown() defer backend.Shutdown()
var listener net.Listener var listener net.Listener
@ -127,6 +156,10 @@ func main() {
log.Println("Starting garbage collection.") log.Println("Starting garbage collection.")
runtime.GC() runtime.GC()
log.Println("Garbage collection done.") log.Println("Garbage collection done.")
case <-changeChan:
if changeTracker != nil {
changeTracker.FlushChanges(changeUrl)
}
} }
} }
} }

View File

@ -27,17 +27,18 @@ const (
) )
type SqliteBackend struct { type SqliteBackend struct {
db *sql.DB db *sql.DB
encoder common.KeyEncoder encoder common.KeyEncoder
decoder common.KeyDecoder decoder common.KeyDecoder
interleaved bool changeTracker *ChangeTracker
existsStmt *sql.Stmt interleaved bool
fetchStmt *sql.Stmt existsStmt *sql.Stmt
insertStmt *sql.Stmt fetchStmt *sql.Stmt
updateStmt *sql.Stmt insertStmt *sql.Stmt
countStmt *sql.Stmt updateStmt *sql.Stmt
keysStmt *sql.Stmt countStmt *sql.Stmt
rangeStmt *sql.Stmt keysStmt *sql.Stmt
rangeStmt *sql.Stmt
} }
type SqliteSession struct { type SqliteSession struct {
@ -58,9 +59,11 @@ func (ss *SqliteSession) Close() error {
return nil return nil
} }
func NewSqliteBackend(path string, interleaved bool) (sqlb *SqliteBackend, err error) { func NewSqliteBackend(
path string,
changeTracker *ChangeTracker, interleaved bool) (sqlb *SqliteBackend, err error) {
res := SqliteBackend{interleaved: interleaved} res := SqliteBackend{interleaved: interleaved, changeTracker: changeTracker}
if res.db, err = sql.Open("sqlite3", path); err != nil { if res.db, err = sql.Open("sqlite3", path); err != nil {
return return
@ -207,6 +210,11 @@ func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error)
insertStmt := ss.txStmt(ss.backend.insertStmt) insertStmt := ss.txStmt(ss.backend.insertStmt)
_, err = insertStmt.Exec(pos, value) _, err = insertStmt.Exec(pos, value)
} }
// This technically too early because this done in transactions
// which are commited (and possible fail) later.
if ss.backend.changeTracker != nil {
ss.backend.changeTracker.BlockChanged(key)
}
return return
} }

View File

@ -216,6 +216,15 @@ func DecodeStringFromBytesToInterleaved(key []byte) (v int64, err error) {
return return
} }
func DecodeStringBytesToCoord(key []byte) (coord Coord, err error) {
var k int64
if k, err = DecodeStringFromBytes(key); err != nil {
return
}
coord = PlainToCoord(k)
return
}
func EncodeStringToBytesFromInterleaved(key int64) ([]byte, error) { func EncodeStringToBytesFromInterleaved(key int64) ([]byte, error) {
return EncodeStringToBytes(TransformInterleavedToPlain(key)) return EncodeStringToBytes(TransformInterleavedToPlain(key))
} }