From 712708f87c968b2c7b2c8f9e5bf0189668588848 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Fri, 22 Aug 2014 22:26:03 +0200 Subject: [PATCH] Added transcoder functions to LevelDB backend. --- common/coords.go | 5 +++++ leveldb.go | 43 +++++++++++++++++++++++++++++-------------- main.go | 6 +++++- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/common/coords.go b/common/coords.go index 2ca3189..e7c3490 100644 --- a/common/coords.go +++ b/common/coords.go @@ -24,6 +24,7 @@ type ( KeyTransformer func(int64) int64 KeyEncoder func(int64) ([]byte, error) KeyDecoder func([]byte) (int64, error) + KeyTranscoder func([]byte) ([]byte, error) KeySplitter func(int64) Coord KeyJoiner func(Coord) int64 ) @@ -156,6 +157,10 @@ func EncodeStringToBytesFromInterleaved(key int64) ([]byte, error) { return EncodeStringToBytes(TransformInterleavedToPlain(key)) } +func IdentityTranscoder(key []byte) ([]byte, error) { + return key, nil +} + func SelectKeySplitter(interleaved bool) KeySplitter { if interleaved { return InterleavedToCoord diff --git a/leveldb.go b/leveldb.go index 64cf1d5..b0715a1 100644 --- a/leveldb.go +++ b/leveldb.go @@ -8,13 +8,17 @@ import ( "log" "sync" + "bitbucket.org/s_l_teichmann/mtredisalize/common" + leveldb "github.com/jmhodges/levigo" ) type LevelDBBackend struct { - cache *leveldb.Cache - db *leveldb.DB - mutex sync.RWMutex + cache *leveldb.Cache + db *leveldb.DB + encoder common.KeyTranscoder + decoder common.KeyTranscoder + mutex sync.RWMutex } type LevelDBSession struct { @@ -22,7 +26,11 @@ type LevelDBSession struct { tx *leveldb.WriteBatch } -func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err error) { +func NewLeveDBBackend( + path string, + encoder common.KeyTranscoder, + decoder common.KeyTranscoder, + cacheSize int) (ldb *LevelDBBackend, err error) { opts := leveldb.NewOptions() var cache *leveldb.Cache @@ -41,8 +49,10 @@ func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err erro return } ldb = &LevelDBBackend{ - cache: cache, - db: db} + cache: cache, + db: db, + encoder: encoder, + decoder: decoder} return } @@ -78,7 +88,9 @@ func (ldb *LevelDBBackend) Shutdown() error { } func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { - + if key, err = ldbs.backend.decoder(key); err != nil { + return + } ldbs.backend.doRead(func(db *leveldb.DB) { ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) @@ -108,24 +120,21 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { } func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { - + if key, err = ldbs.backend.decoder(key); err != nil { + return + } ldbs.backend.doWrite(func(db *leveldb.DB) { if exists, err = keyExists(db, key); err != nil { return } - if ldbs.tx != nil { - //log.Printf("Stored in tx: key = '%s' len(value) = %d\n", key, len(value)) ldbs.tx.Put(key, value) return } - - //log.Printf("Stored: key = '%s' len(value) = %d\n", key, len(value)) wo := leveldb.NewWriteOptions() err = ldbs.backend.db.Put(wo, key, value) wo.Close() }) - return } @@ -178,8 +187,14 @@ func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err e defer close(keys) defer it.Close() it.SeekToFirst() + encoder := ldbs.backend.encoder for ; it.Valid(); it.Next() { - keys <- it.Key() + if key, err := encoder(it.Key()); err == nil { + keys <- key + } else { + log.Printf("WARN: %s\n", err) + return + } } if err := it.GetError(); err != nil { log.Printf("WARN: %s\n", err) diff --git a/main.go b/main.go index bdf38df..17ffa9b 100644 --- a/main.go +++ b/main.go @@ -70,7 +70,11 @@ func main() { log.Fatal(err) } } else { - if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil { + if backend, err = NewLeveDBBackend( + args[0], + common.IdentityTranscoder, + common.IdentityTranscoder, + cacheSize); err != nil { log.Fatal(err) } }