// Copyright 2014, 2015 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "os" "bitbucket.org/s_l_teichmann/mtsatellite/common" leveldb "github.com/jmhodges/levigo" ) type ( // LevelDBBlockProducer is a helper to fetch blocks from a LevelDB. LevelDBBlockProducer struct { db *leveldb.DB opts *leveldb.Options ro *leveldb.ReadOptions iterator *leveldb.Iterator splitter common.KeySplitter decoder common.KeyDecoder } // LevelDBBlockConsumer is a helper to store blocks in a LevelDB. LevelDBBlockConsumer struct { db *leveldb.DB opts *leveldb.Options wo *leveldb.WriteOptions joiner common.KeyJoiner encoder common.KeyEncoder } ) // NewLevelDBBlockProducer returns a new helper to fetch blocks from a LevelDB. func NewLevelDBBlockProducer(path string, splitter common.KeySplitter, decoder common.KeyDecoder) (ldbp *LevelDBBlockProducer, err error) { // check if we can stat it -> exists. if _, err = os.Stat(path); err != nil { return } opts := leveldb.NewOptions() opts.SetCreateIfMissing(false) var db *leveldb.DB if db, err = leveldb.Open(path, opts); err != nil { opts.Close() return } ro := leveldb.NewReadOptions() ro.SetFillCache(false) iterator := db.NewIterator(ro) iterator.SeekToFirst() ldbp = &LevelDBBlockProducer{ db: db, opts: opts, ro: ro, iterator: iterator, splitter: splitter, decoder: decoder} return } // Close closes a helper to fetch blocks from a LevelDB. func (ldbp *LevelDBBlockProducer) Close() error { if ldbp.iterator != nil { ldbp.iterator.Close() } ldbp.ro.Close() ldbp.db.Close() ldbp.opts.Close() return nil } // Next fetches the next block from a LevelDB. func (ldbp *LevelDBBlockProducer) Next(block *common.Block) (err error) { if ldbp.iterator == nil { err = common.ErrNoMoreBlocks return } if !ldbp.iterator.Valid() { if err = ldbp.iterator.GetError(); err == nil { err = common.ErrNoMoreBlocks } ldbp.iterator.Close() ldbp.iterator = nil return } var key int64 if key, err = ldbp.decoder(ldbp.iterator.Key()); err != nil { return } block.Coord = ldbp.splitter(key) block.Data = ldbp.iterator.Value() ldbp.iterator.Next() return } // NewLevelDBBlockConsumer returns a new helper to store blocks in a LevelDB. func NewLevelDBBlockConsumer( path string, joiner common.KeyJoiner, encoder common.KeyEncoder) (ldbc *LevelDBBlockConsumer, err error) { opts := leveldb.NewOptions() opts.SetCreateIfMissing(true) var db *leveldb.DB if db, err = leveldb.Open(path, opts); err != nil { return } ldbc = &LevelDBBlockConsumer{ db: db, opts: opts, wo: leveldb.NewWriteOptions(), joiner: joiner, encoder: encoder} return } // Close closes a helper to store blocks in a LevelDB. func (ldbc *LevelDBBlockConsumer) Close() error { ldbc.wo.Close() ldbc.db.Close() ldbc.opts.Close() return nil } // Consume stores a block in LevelDB. func (ldbc *LevelDBBlockConsumer) Consume(block *common.Block) (err error) { var encodedKey []byte if encodedKey, err = ldbc.encoder(ldbc.joiner(block.Coord)); err != nil { return } err = ldbc.db.Put(ldbc.wo, encodedKey, block.Data) return }