Free memory of write batches in LevelDB backend. Use an extra r/w mutex.

This commit is contained in:
Sascha L. Teichmann 2014-08-07 15:16:12 +02:00
parent 954dddf795
commit c09d74ae0f
1 changed files with 48 additions and 18 deletions

View File

@ -6,6 +6,7 @@ package main
import (
"log"
"sync"
leveldb "github.com/jmhodges/levigo"
)
@ -13,6 +14,7 @@ import (
type LevelDBBackend struct {
cache *leveldb.Cache
db *leveldb.DB
mutex sync.RWMutex
}
type LevelDBSession struct {
@ -36,6 +38,18 @@ func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err erro
return
}
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock()
f(ldb.db)
ldb.mutex.RUnlock()
}
func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
ldb.mutex.Lock()
f(ldb.db)
ldb.mutex.Unlock()
}
func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil
}
@ -54,9 +68,19 @@ func (ldb *LevelDBBackend) Shutdown() error {
}
func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
ro.Close()
var pos int64
if pos, err = bytes2pos(key); err != nil {
return
}
// Re-code it to make LevelDB happy.
key = pos2bytes(pos)
ldbs.backend.doRead(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
ro.Close()
})
return
}
@ -64,11 +88,11 @@ func (ldbs *LevelDBSession) InTransaction() bool {
return ldbs.tx != nil
}
func (ldbs *LevelDBSession) keyExists(key []byte) (exists bool, err error) {
func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = ldbs.backend.db.Get(ro, key); err != nil {
if data, err = db.Get(ro, key); err != nil {
return
}
exists = data != nil
@ -84,18 +108,20 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
// Re-code it to make LevelDB happy.
key = pos2bytes(pos)
if exists, err = ldbs.keyExists(key); err != nil {
return
}
ldbs.backend.doWrite(func(db *leveldb.DB) {
if exists, err = keyExists(db, key); err != nil {
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
return
}
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
})
return
}
@ -112,8 +138,12 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return
}
ldbs.tx = nil
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Write(wo, tx)
wo.Close()
ldbs.backend.doWrite(func(db *leveldb.DB) {
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = db.Write(wo, tx)
wo.Close()
tx.Close()
})
return
}