From 87b34d123e67cfc45d44259f4a4a2658310209d2 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Wed, 20 Aug 2014 16:21:44 +0200 Subject: [PATCH] Added block producer for LevelDB in interleaver. --- interleaver/leveldb.go | 92 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/interleaver/leveldb.go b/interleaver/leveldb.go index 4941697..6a854cf 100644 --- a/interleaver/leveldb.go +++ b/interleaver/leveldb.go @@ -5,16 +5,96 @@ package main import ( + "os" + "bitbucket.org/s_l_teichmann/mtredisalize/common" leveldb "github.com/jmhodges/levigo" ) -type LevelDBBlockConsumer struct { - db *leveldb.DB - wo *leveldb.WriteOptions - joiner common.KeyJoiner - encoder common.KeyEncoder +type ( + LevelDBBlockProducer struct { + db *leveldb.DB + opts *leveldb.Options + ro *leveldb.ReadOptions + iterator *leveldb.Iterator + splitter common.KeySplitter + decoder common.KeyDecoder + } + + LevelDBBlockConsumer struct { + db *leveldb.DB + opts *leveldb.Options + wo *leveldb.WriteOptions + joiner common.KeyJoiner + encoder common.KeyEncoder + } +) + +func NewLevelDBlockProducer(path string, + splitter common.KeySplitter, + decoder common.KeyDecoder) (ldbp *LevelDBBlockProducer, err error) { + + // check if we can stat it -> exists. + if _, err = os.Stat(path); err != nil { + return + } + + opts := leveldb.NewOptions() + opts.SetCreateIfMissing(false) + + var db *leveldb.DB + if db, err = leveldb.Open(path, opts); err != nil { + opts.Close() + return + } + + ro := leveldb.NewReadOptions() + ro.SetFillCache(false) + + iterator := db.NewIterator(ro) + iterator.SeekToFirst() + + ldbp = &LevelDBBlockProducer{ + db: db, + opts: opts, + ro: ro, + iterator: iterator, + splitter: splitter, + decoder: decoder} + return +} + +func (ldbp *LevelDBBlockProducer) Close() error { + if ldbp.iterator != nil { + ldbp.iterator.Close() + } + ldbp.ro.Close() + ldbp.db.Close() + ldbp.opts.Close() + return nil +} + +func (ldbp *LevelDBBlockProducer) Next(block *Block) (err error) { + if ldbp.iterator == nil { + err = NoMoreBlocksErr + return + } + if !ldbp.iterator.Valid() { + if err = ldbp.iterator.GetError(); err == nil { + err = NoMoreBlocksErr + } + ldbp.iterator.Close() + ldbp.iterator = nil + return + } + var key int64 + if key, err = ldbp.decoder(ldbp.iterator.Key()); err != nil { + return + } + block.Coord = ldbp.splitter(key) + block.Data = ldbp.iterator.Value() + return } func NewLevelDBBlockConsumer( @@ -32,6 +112,7 @@ func NewLevelDBBlockConsumer( ldbc = &LevelDBBlockConsumer{ db: db, + opts: opts, wo: leveldb.NewWriteOptions(), joiner: joiner, encoder: encoder} @@ -41,6 +122,7 @@ func NewLevelDBBlockConsumer( func (ldbc *LevelDBBlockConsumer) Close() error { ldbc.wo.Close() ldbc.db.Close() + ldbc.opts.Close() return nil }