// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "log" "sync" "bitbucket.org/s_l_teichmann/mtredisalize/common" leveldb "github.com/jmhodges/levigo" ) type LevelDBBackend struct { cache *leveldb.Cache db *leveldb.DB encoder common.KeyTranscoder decoder common.KeyTranscoder mutex sync.RWMutex } type LevelDBSession struct { backend *LevelDBBackend tx *leveldb.WriteBatch } func NewLeveDBBackend( path string, encoder common.KeyTranscoder, decoder common.KeyTranscoder, cacheSize int) (ldb *LevelDBBackend, err error) { opts := leveldb.NewOptions() var cache *leveldb.Cache if cacheSize > 0 { cache = leveldb.NewLRUCache(cacheSize * 1024 * 1024) opts.SetCache(cache) } opts.SetCreateIfMissing(true) var db *leveldb.DB if db, err = leveldb.Open(path, opts); err != nil { if cache != nil { cache.Close() } return } ldb = &LevelDBBackend{ cache: cache, db: db, encoder: encoder, decoder: decoder} return } func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { ldb.mutex.RLock() f(ldb.db) ldb.mutex.RUnlock() } func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { ldb.mutex.Lock() f(ldb.db) ldb.mutex.Unlock() } func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } func (ldbs *LevelDBSession) Close() error { if ldbs.tx != nil { ldbs.tx.Close() } return nil } func (ldb *LevelDBBackend) Shutdown() error { ldb.db.Close() if ldb.cache != nil { ldb.cache.Close() } return nil } func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } ldbs.backend.doRead(func(db *leveldb.DB) { ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { // log.Printf("Fetch key '%s' failed.\n", key) //} else { // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) //} ro.Close() }) return } func (ldbs *LevelDBSession) InTransaction() bool { return ldbs.tx != nil } func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte if data, err = db.Get(ro, key); err != nil { return } exists = data != nil return } func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } ldbs.backend.doWrite(func(db *leveldb.DB) { if exists, err = keyExists(db, key); err != nil { return } if ldbs.tx != nil { ldbs.tx.Put(key, value) return } wo := leveldb.NewWriteOptions() err = ldbs.backend.db.Put(wo, key, value) wo.Close() }) return } func (ldbs *LevelDBSession) BeginTransaction() error { ldbs.tx = leveldb.NewWriteBatch() return nil } func (ldbs *LevelDBSession) CommitTransaction() (err error) { tx := ldbs.tx if tx == nil { log.Println("WARN: No transaction running.") return } ldbs.tx = nil ldbs.backend.doWrite(func(db *leveldb.DB) { wo := leveldb.NewWriteOptions() wo.SetSync(true) err = db.Write(wo, tx) wo.Close() tx.Close() }) return } func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) { ldbs.backend.mutex.RLock() ro := leveldb.NewReadOptions() ro.SetFillCache(false) it := ldbs.backend.db.NewIterator(ro) it.SeekToFirst() for ; it.Valid(); it.Next() { n++ } if err = it.GetError(); err != nil { it.Close() ro.Close() ldbs.backend.mutex.RUnlock() return } keys = make(chan []byte) go func() { ldbs.backend.mutex.RUnlock() defer ro.Close() defer close(keys) defer it.Close() it.SeekToFirst() encoder := ldbs.backend.encoder for ; it.Valid(); it.Next() { if key, err := encoder(it.Key()); err == nil { keys <- key } else { log.Printf("WARN: %s\n", err) return } } if err := it.GetError(); err != nil { log.Printf("WARN: %s\n", err) } }() return }