Added block producer for LevelDB in interleaver.

This commit is contained in:
Sascha L. Teichmann 2014-08-20 16:21:44 +02:00
parent 8f4a558234
commit 87b34d123e

View File

@ -5,16 +5,96 @@
package main package main
import ( import (
"os"
"bitbucket.org/s_l_teichmann/mtredisalize/common" "bitbucket.org/s_l_teichmann/mtredisalize/common"
leveldb "github.com/jmhodges/levigo" leveldb "github.com/jmhodges/levigo"
) )
type LevelDBBlockConsumer struct { type (
db *leveldb.DB LevelDBBlockProducer struct {
wo *leveldb.WriteOptions db *leveldb.DB
joiner common.KeyJoiner opts *leveldb.Options
encoder common.KeyEncoder ro *leveldb.ReadOptions
iterator *leveldb.Iterator
splitter common.KeySplitter
decoder common.KeyDecoder
}
LevelDBBlockConsumer struct {
db *leveldb.DB
opts *leveldb.Options
wo *leveldb.WriteOptions
joiner common.KeyJoiner
encoder common.KeyEncoder
}
)
func NewLevelDBlockProducer(path string,
splitter common.KeySplitter,
decoder common.KeyDecoder) (ldbp *LevelDBBlockProducer, err error) {
// check if we can stat it -> exists.
if _, err = os.Stat(path); err != nil {
return
}
opts := leveldb.NewOptions()
opts.SetCreateIfMissing(false)
var db *leveldb.DB
if db, err = leveldb.Open(path, opts); err != nil {
opts.Close()
return
}
ro := leveldb.NewReadOptions()
ro.SetFillCache(false)
iterator := db.NewIterator(ro)
iterator.SeekToFirst()
ldbp = &LevelDBBlockProducer{
db: db,
opts: opts,
ro: ro,
iterator: iterator,
splitter: splitter,
decoder: decoder}
return
}
func (ldbp *LevelDBBlockProducer) Close() error {
if ldbp.iterator != nil {
ldbp.iterator.Close()
}
ldbp.ro.Close()
ldbp.db.Close()
ldbp.opts.Close()
return nil
}
func (ldbp *LevelDBBlockProducer) Next(block *Block) (err error) {
if ldbp.iterator == nil {
err = NoMoreBlocksErr
return
}
if !ldbp.iterator.Valid() {
if err = ldbp.iterator.GetError(); err == nil {
err = NoMoreBlocksErr
}
ldbp.iterator.Close()
ldbp.iterator = nil
return
}
var key int64
if key, err = ldbp.decoder(ldbp.iterator.Key()); err != nil {
return
}
block.Coord = ldbp.splitter(key)
block.Data = ldbp.iterator.Value()
return
} }
func NewLevelDBBlockConsumer( func NewLevelDBBlockConsumer(
@ -32,6 +112,7 @@ func NewLevelDBBlockConsumer(
ldbc = &LevelDBBlockConsumer{ ldbc = &LevelDBBlockConsumer{
db: db, db: db,
opts: opts,
wo: leveldb.NewWriteOptions(), wo: leveldb.NewWriteOptions(),
joiner: joiner, joiner: joiner,
encoder: encoder} encoder: encoder}
@ -41,6 +122,7 @@ func NewLevelDBBlockConsumer(
func (ldbc *LevelDBBlockConsumer) Close() error { func (ldbc *LevelDBBlockConsumer) Close() error {
ldbc.wo.Close() ldbc.wo.Close()
ldbc.db.Close() ldbc.db.Close()
ldbc.opts.Close()
return nil return nil
} }