diff --git a/leveldb.go b/leveldb.go index b245e41..5db6caa 100644 --- a/leveldb.go +++ b/leveldb.go @@ -6,6 +6,7 @@ package main import ( "log" + "sync" leveldb "github.com/jmhodges/levigo" ) @@ -13,6 +14,7 @@ import ( type LevelDBBackend struct { cache *leveldb.Cache db *leveldb.DB + mutex sync.RWMutex } type LevelDBSession struct { @@ -36,6 +38,18 @@ func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err erro return } +func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { + ldb.mutex.RLock() + f(ldb.db) + ldb.mutex.RUnlock() +} + +func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { + ldb.mutex.Lock() + f(ldb.db) + ldb.mutex.Unlock() +} + func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } @@ -54,9 +68,19 @@ func (ldb *LevelDBBackend) Shutdown() error { } func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { - ro := leveldb.NewReadOptions() - value, err = ldbs.backend.db.Get(ro, key) - ro.Close() + + var pos int64 + if pos, err = bytes2pos(key); err != nil { + return + } + // Re-code it to make LevelDB happy. + key = pos2bytes(pos) + + ldbs.backend.doRead(func(db *leveldb.DB) { + ro := leveldb.NewReadOptions() + value, err = ldbs.backend.db.Get(ro, key) + ro.Close() + }) return } @@ -64,11 +88,11 @@ func (ldbs *LevelDBSession) InTransaction() bool { return ldbs.tx != nil } -func (ldbs *LevelDBSession) keyExists(key []byte) (exists bool, err error) { +func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte - if data, err = ldbs.backend.db.Get(ro, key); err != nil { + if data, err = db.Get(ro, key); err != nil { return } exists = data != nil @@ -84,18 +108,20 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err // Re-code it to make LevelDB happy. key = pos2bytes(pos) - if exists, err = ldbs.keyExists(key); err != nil { - return - } + ldbs.backend.doWrite(func(db *leveldb.DB) { + if exists, err = keyExists(db, key); err != nil { + return + } - if ldbs.tx != nil { - ldbs.tx.Put(key, value) - return - } + if ldbs.tx != nil { + ldbs.tx.Put(key, value) + return + } - wo := leveldb.NewWriteOptions() - err = ldbs.backend.db.Put(wo, key, value) - wo.Close() + wo := leveldb.NewWriteOptions() + err = ldbs.backend.db.Put(wo, key, value) + wo.Close() + }) return } @@ -112,8 +138,12 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) { return } ldbs.tx = nil - wo := leveldb.NewWriteOptions() - err = ldbs.backend.db.Write(wo, tx) - wo.Close() + ldbs.backend.doWrite(func(db *leveldb.DB) { + wo := leveldb.NewWriteOptions() + wo.SetSync(true) + err = db.Write(wo, tx) + wo.Close() + tx.Close() + }) return }