mirror of
https://bitbucket.org/s_l_teichmann/mtsatellite
synced 2024-12-24 17:20:18 +01:00
138 lines
2.7 KiB
Go
138 lines
2.7 KiB
Go
// Copyright 2014 by Sascha L. Teichmann
|
|
// Use of this source code is governed by the MIT license
|
|
// that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"os"
|
|
|
|
"bitbucket.org/s_l_teichmann/mtredisalize/common"
|
|
|
|
leveldb "github.com/jmhodges/levigo"
|
|
)
|
|
|
|
type (
|
|
LevelDBBlockProducer struct {
|
|
db *leveldb.DB
|
|
opts *leveldb.Options
|
|
ro *leveldb.ReadOptions
|
|
iterator *leveldb.Iterator
|
|
splitter common.KeySplitter
|
|
decoder common.KeyDecoder
|
|
}
|
|
|
|
LevelDBBlockConsumer struct {
|
|
db *leveldb.DB
|
|
opts *leveldb.Options
|
|
wo *leveldb.WriteOptions
|
|
joiner common.KeyJoiner
|
|
encoder common.KeyEncoder
|
|
}
|
|
)
|
|
|
|
func NewLevelDBBlockProducer(path string,
|
|
splitter common.KeySplitter,
|
|
decoder common.KeyDecoder) (ldbp *LevelDBBlockProducer, err error) {
|
|
|
|
// check if we can stat it -> exists.
|
|
if _, err = os.Stat(path); err != nil {
|
|
return
|
|
}
|
|
|
|
opts := leveldb.NewOptions()
|
|
opts.SetCreateIfMissing(false)
|
|
|
|
var db *leveldb.DB
|
|
if db, err = leveldb.Open(path, opts); err != nil {
|
|
opts.Close()
|
|
return
|
|
}
|
|
|
|
ro := leveldb.NewReadOptions()
|
|
ro.SetFillCache(false)
|
|
|
|
iterator := db.NewIterator(ro)
|
|
iterator.SeekToFirst()
|
|
|
|
ldbp = &LevelDBBlockProducer{
|
|
db: db,
|
|
opts: opts,
|
|
ro: ro,
|
|
iterator: iterator,
|
|
splitter: splitter,
|
|
decoder: decoder}
|
|
return
|
|
}
|
|
|
|
func (ldbp *LevelDBBlockProducer) Close() error {
|
|
if ldbp.iterator != nil {
|
|
ldbp.iterator.Close()
|
|
}
|
|
ldbp.ro.Close()
|
|
ldbp.db.Close()
|
|
ldbp.opts.Close()
|
|
return nil
|
|
}
|
|
|
|
func (ldbp *LevelDBBlockProducer) Next(block *common.Block) (err error) {
|
|
if ldbp.iterator == nil {
|
|
err = common.ErrNoMoreBlocks
|
|
return
|
|
}
|
|
if !ldbp.iterator.Valid() {
|
|
if err = ldbp.iterator.GetError(); err == nil {
|
|
err = common.ErrNoMoreBlocks
|
|
}
|
|
ldbp.iterator.Close()
|
|
ldbp.iterator = nil
|
|
return
|
|
}
|
|
var key int64
|
|
if key, err = ldbp.decoder(ldbp.iterator.Key()); err != nil {
|
|
return
|
|
}
|
|
block.Coord = ldbp.splitter(key)
|
|
block.Data = ldbp.iterator.Value()
|
|
ldbp.iterator.Next()
|
|
return
|
|
}
|
|
|
|
func NewLevelDBBlockConsumer(
|
|
path string,
|
|
joiner common.KeyJoiner,
|
|
encoder common.KeyEncoder) (ldbc *LevelDBBlockConsumer, err error) {
|
|
|
|
opts := leveldb.NewOptions()
|
|
opts.SetCreateIfMissing(true)
|
|
|
|
var db *leveldb.DB
|
|
if db, err = leveldb.Open(path, opts); err != nil {
|
|
return
|
|
}
|
|
|
|
ldbc = &LevelDBBlockConsumer{
|
|
db: db,
|
|
opts: opts,
|
|
wo: leveldb.NewWriteOptions(),
|
|
joiner: joiner,
|
|
encoder: encoder}
|
|
return
|
|
}
|
|
|
|
func (ldbc *LevelDBBlockConsumer) Close() error {
|
|
ldbc.wo.Close()
|
|
ldbc.db.Close()
|
|
ldbc.opts.Close()
|
|
return nil
|
|
}
|
|
|
|
func (ldbc *LevelDBBlockConsumer) Consume(block *common.Block) (err error) {
|
|
var encodedKey []byte
|
|
if encodedKey, err = ldbc.encoder(ldbc.joiner(block.Coord)); err != nil {
|
|
return
|
|
}
|
|
err = ldbc.db.Put(ldbc.wo, encodedKey, block.Data)
|
|
return
|
|
}
|