// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "log" "sync" leveldb "github.com/jmhodges/levigo" ) type LevelDBBackend struct { cache *leveldb.Cache db *leveldb.DB mutex sync.RWMutex } type LevelDBSession struct { backend *LevelDBBackend tx *leveldb.WriteBatch } func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err error) { cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024) opts := leveldb.NewOptions() opts.SetCache(cache) opts.SetCreateIfMissing(true) var db *leveldb.DB if db, err = leveldb.Open(path, opts); err != nil { cache.Close() return } ldb = &LevelDBBackend{ cache: cache, db: db} return } func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { ldb.mutex.RLock() f(ldb.db) ldb.mutex.RUnlock() } func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { ldb.mutex.Lock() f(ldb.db) ldb.mutex.Unlock() } func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } func (ldbs *LevelDBSession) Close() error { if ldbs.tx != nil { ldbs.tx.Close() } return nil } func (ldb *LevelDBBackend) Shutdown() error { ldb.db.Close() ldb.cache.Close() return nil } func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { ldbs.backend.doRead(func(db *leveldb.DB) { ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { // log.Printf("Fetch key '%s' failed.\n", key) //} else { // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) //} ro.Close() }) return } func (ldbs *LevelDBSession) InTransaction() bool { return ldbs.tx != nil } func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte if data, err = db.Get(ro, key); err != nil { return } exists = data != nil return } func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { ldbs.backend.doWrite(func(db *leveldb.DB) { if exists, err = keyExists(db, key); err != nil { return } if ldbs.tx != nil { //log.Printf("Stored in tx: key = '%s' len(value) = %d\n", key, len(value)) ldbs.tx.Put(key, value) return } //log.Printf("Stored: key = '%s' len(value) = %d\n", key, len(value)) wo := leveldb.NewWriteOptions() err = ldbs.backend.db.Put(wo, key, value) wo.Close() }) return } func (ldbs *LevelDBSession) BeginTransaction() error { ldbs.tx = leveldb.NewWriteBatch() return nil } func (ldbs *LevelDBSession) CommitTransaction() (err error) { tx := ldbs.tx if tx == nil { log.Println("WARN: No transaction running.") return } ldbs.tx = nil ldbs.backend.doWrite(func(db *leveldb.DB) { wo := leveldb.NewWriteOptions() wo.SetSync(true) err = db.Write(wo, tx) wo.Close() tx.Close() }) return }