mtsatellite/leveldb.go

191 lines
3.7 KiB
Go
Raw Normal View History

2014-08-03 15:59:56 +02:00
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"log"
"sync"
leveldb "github.com/jmhodges/levigo"
)
type LevelDBBackend struct {
cache *leveldb.Cache
db *leveldb.DB
mutex sync.RWMutex
}
type LevelDBSession struct {
backend *LevelDBBackend
tx *leveldb.WriteBatch
}
func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err error) {
opts := leveldb.NewOptions()
var cache *leveldb.Cache
if cacheSize > 0 {
cache = leveldb.NewLRUCache(cacheSize * 1024 * 1024)
opts.SetCache(cache)
}
opts.SetCreateIfMissing(true)
var db *leveldb.DB
if db, err = leveldb.Open(path, opts); err != nil {
if cache != nil {
cache.Close()
}
return
}
ldb = &LevelDBBackend{
cache: cache,
db: db}
return
}
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock()
f(ldb.db)
ldb.mutex.RUnlock()
}
func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
ldb.mutex.Lock()
f(ldb.db)
ldb.mutex.Unlock()
}
func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil
}
func (ldbs *LevelDBSession) Close() error {
if ldbs.tx != nil {
ldbs.tx.Close()
2014-08-04 15:29:26 +02:00
}
return nil
}
func (ldb *LevelDBBackend) Shutdown() error {
ldb.db.Close()
if ldb.cache != nil {
ldb.cache.Close()
}
return nil
}
func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
ldbs.backend.doRead(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
2014-08-08 00:16:00 +02:00
//if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
})
2014-08-04 22:23:16 +02:00
return
}
func (ldbs *LevelDBSession) InTransaction() bool {
return ldbs.tx != nil
}
func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = db.Get(ro, key); err != nil {
return
}
exists = data != nil
return
}
func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) {
ldbs.backend.doWrite(func(db *leveldb.DB) {
if exists, err = keyExists(db, key); err != nil {
return
}
if ldbs.tx != nil {
2014-08-08 00:16:00 +02:00
//log.Printf("Stored in tx: key = '%s' len(value) = %d\n", key, len(value))
ldbs.tx.Put(key, value)
return
}
2014-08-08 00:16:00 +02:00
//log.Printf("Stored: key = '%s' len(value) = %d\n", key, len(value))
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
})
return
}
func (ldbs *LevelDBSession) BeginTransaction() error {
ldbs.tx = leveldb.NewWriteBatch()
return nil
}
func (ldbs *LevelDBSession) CommitTransaction() (err error) {
tx := ldbs.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return
}
ldbs.tx = nil
ldbs.backend.doWrite(func(db *leveldb.DB) {
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = db.Write(wo, tx)
wo.Close()
tx.Close()
})
2014-08-04 15:29:26 +02:00
return
}
func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) {
ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
for ; it.Valid(); it.Next() {
n++
}
if err = it.GetError(); err != nil {
ro.Close()
it.Close()
ldbs.backend.mutex.RUnlock()
return
}
it.Close()
keys = make(chan []byte)
go func() {
ldbs.backend.mutex.RUnlock()
defer ro.Close()
defer close(keys)
it := ldbs.backend.db.NewIterator(ro)
defer it.Close()
for ; it.Valid(); it.Next() {
keys <- it.Key()
}
if err := it.GetError(); err != nil {
log.Printf("WARN: %s\n", err)
}
}()
return
}