diff --git a/cmd/mtredisalize/changetracker.go b/cmd/mtredisalize/changetracker.go new file mode 100644 index 0000000..0b9973c --- /dev/null +++ b/cmd/mtredisalize/changetracker.go @@ -0,0 +1,33 @@ +package main + +import ( + "log" + "sync" + + "bitbucket.org/s_l_teichmann/mtredisalize/common" +) + +type ChangeTracker struct { + mutex sync.Mutex +} + +func NewChangeTracker() *ChangeTracker { + return &ChangeTracker{} +} + +func (ct *ChangeTracker) BlockChanged(key []byte) { + ct.mutex.Lock() + defer ct.mutex.Unlock() + var err error + var coord common.Coord + if coord, err = common.DecodeStringBytesToCoord(key); err != nil { + log.Printf("decoding key failed: %s", err) + return + } + log.Printf("changed block: %v", coord) +} + +func (ct *ChangeTracker) FlushChanges(url string) (err error) { + log.Println("change flush triggered") + return +} diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 8a1e357..6638a19 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -14,12 +14,13 @@ import ( ) type LevelDBBackend struct { - cache *leveldb.Cache - db *leveldb.DB - interleaved bool - encoder common.KeyTranscoder - decoder common.KeyTranscoder - mutex sync.RWMutex + cache *leveldb.Cache + db *leveldb.DB + interleaved bool + encoder common.KeyTranscoder + decoder common.KeyTranscoder + changeTracker *ChangeTracker + mutex sync.RWMutex } type LevelDBSession struct { @@ -29,6 +30,7 @@ type LevelDBSession struct { func NewLeveDBBackend( path string, + changeTracker *ChangeTracker, interleaved bool, cacheSize int) (ldb *LevelDBBackend, err error) { @@ -63,11 +65,12 @@ func NewLeveDBBackend( } ldb = &LevelDBBackend{ - cache: cache, - db: db, - interleaved: interleaved, - encoder: encoder, - decoder: decoder} + cache: cache, + db: db, + interleaved: interleaved, + encoder: encoder, + decoder: decoder, + changeTracker: changeTracker} return } @@ -135,6 +138,7 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { } func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { + origKey := key if key, err = ldbs.backend.decoder(key); err != nil { return } @@ -150,6 +154,11 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err err = db.Put(wo, key, value) wo.Close() }) + // This technically too early because this done in transactions + // which are commited (and possible fail) later. + if ldbs.backend.changeTracker != nil { + ldbs.backend.changeTracker.BlockChanged(origKey) + } return } diff --git a/cmd/mtredisalize/main.go b/cmd/mtredisalize/main.go index 23c9f5a..7a3c2f6 100644 --- a/cmd/mtredisalize/main.go +++ b/cmd/mtredisalize/main.go @@ -16,8 +16,9 @@ import ( ) const ( - Version = "0.3" - GCDuration = "24h" + Version = "0.3" + GCDuration = "24h" + ChangeDuration = "30s" ) func usage() { @@ -30,13 +31,15 @@ func usage() { func main() { var ( - port int - host string - driver string - cacheSize int - version bool - interleaved bool - gcDuration string + port int + host string + driver string + cacheSize int + version bool + interleaved bool + changeUrl string + gcDuration string + changeDuration string ) flag.Usage = usage @@ -46,8 +49,13 @@ func main() { flag.StringVar(&host, "host", "", "host to bind") flag.IntVar(&cacheSize, "cache", 32, "cache size in MB") flag.BoolVar(&version, "version", false, "Print version and exit.") - flag.BoolVar(&interleaved, "interleaved", false, "Backend stores key in interleaved form.") - flag.StringVar(&gcDuration, "gc-duration", GCDuration, "Duration between forced GCs.") + flag.BoolVar(&interleaved, + "interleaved", false, "Backend stores key in interleaved form.") + flag.StringVar(&gcDuration, + "gc-duration", GCDuration, "Duration between forced GCs.") + flag.StringVar(&changeDuration, + "change-duration", ChangeDuration, "Duration to aggregate changes.") + flag.StringVar(&changeUrl, "change-url", "", "URL to send changes to.") flag.Parse() if version { @@ -60,26 +68,47 @@ func main() { } var ( - err error - backend Backend - gcDur time.Duration + err error + backend Backend + gcDur time.Duration + chDur time.Duration + changeTracker *ChangeTracker ) if gcDur, err = time.ParseDuration(gcDuration); err != nil { log.Fatal(err) } + // Setup the change listening stuff. + + var changeChan <-chan time.Time + + useChangeNotification := changeUrl != "" + + if useChangeNotification { + if chDur, err = time.ParseDuration(changeDuration); err != nil { + log.Fatal(err) + } + changeChan = time.Tick(chDur) + changeTracker = NewChangeTracker() + } else { + // We will never receive ticks on this. + changeChan = make(<-chan time.Time) + } + path := flag.Arg(0) if driver == "sqlite" { - if backend, err = NewSqliteBackend(path, interleaved); err != nil { + if backend, err = NewSqliteBackend(path, changeTracker, interleaved); err != nil { log.Fatal(err) } } else { - if backend, err = NewLeveDBBackend(path, interleaved, cacheSize); err != nil { + if backend, err = NewLeveDBBackend( + path, changeTracker, interleaved, cacheSize); err != nil { log.Fatal(err) } } + defer backend.Shutdown() var listener net.Listener @@ -127,6 +156,10 @@ func main() { log.Println("Starting garbage collection.") runtime.GC() log.Println("Garbage collection done.") + case <-changeChan: + if changeTracker != nil { + changeTracker.FlushChanges(changeUrl) + } } } } diff --git a/cmd/mtredisalize/sqlite.go b/cmd/mtredisalize/sqlite.go index be1c9ac..741ced0 100644 --- a/cmd/mtredisalize/sqlite.go +++ b/cmd/mtredisalize/sqlite.go @@ -27,17 +27,18 @@ const ( ) type SqliteBackend struct { - db *sql.DB - encoder common.KeyEncoder - decoder common.KeyDecoder - interleaved bool - existsStmt *sql.Stmt - fetchStmt *sql.Stmt - insertStmt *sql.Stmt - updateStmt *sql.Stmt - countStmt *sql.Stmt - keysStmt *sql.Stmt - rangeStmt *sql.Stmt + db *sql.DB + encoder common.KeyEncoder + decoder common.KeyDecoder + changeTracker *ChangeTracker + interleaved bool + existsStmt *sql.Stmt + fetchStmt *sql.Stmt + insertStmt *sql.Stmt + updateStmt *sql.Stmt + countStmt *sql.Stmt + keysStmt *sql.Stmt + rangeStmt *sql.Stmt } type SqliteSession struct { @@ -58,9 +59,11 @@ func (ss *SqliteSession) Close() error { return nil } -func NewSqliteBackend(path string, interleaved bool) (sqlb *SqliteBackend, err error) { +func NewSqliteBackend( + path string, + changeTracker *ChangeTracker, interleaved bool) (sqlb *SqliteBackend, err error) { - res := SqliteBackend{interleaved: interleaved} + res := SqliteBackend{interleaved: interleaved, changeTracker: changeTracker} if res.db, err = sql.Open("sqlite3", path); err != nil { return @@ -207,6 +210,11 @@ func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error) insertStmt := ss.txStmt(ss.backend.insertStmt) _, err = insertStmt.Exec(pos, value) } + // This technically too early because this done in transactions + // which are commited (and possible fail) later. + if ss.backend.changeTracker != nil { + ss.backend.changeTracker.BlockChanged(key) + } return } diff --git a/common/coords.go b/common/coords.go index 4c32122..c2816e9 100644 --- a/common/coords.go +++ b/common/coords.go @@ -216,6 +216,15 @@ func DecodeStringFromBytesToInterleaved(key []byte) (v int64, err error) { return } +func DecodeStringBytesToCoord(key []byte) (coord Coord, err error) { + var k int64 + if k, err = DecodeStringFromBytes(key); err != nil { + return + } + coord = PlainToCoord(k) + return +} + func EncodeStringToBytesFromInterleaved(key int64) ([]byte, error) { return EncodeStringToBytes(TransformInterleavedToPlain(key)) }