Merged feature branch redis-priority to remove unnecessary locking when using LevelBD.

This commit is contained in:
Sascha L. Teichmann 2017-04-15 13:20:43 +02:00
commit 9f8ff8b393
1 changed files with 42 additions and 70 deletions

View File

@ -6,7 +6,6 @@ package main
import (
"log"
"sync"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
@ -22,7 +21,6 @@ type LevelDBBackend struct {
decoder common.KeyTranscoder
changeTracker *changeTracker
mutex sync.RWMutex
}
type LevelDBSession struct {
@ -72,7 +70,8 @@ func NewLeveDBBackend(
interleaved: interleaved,
encoder: encoder,
decoder: decoder,
changeTracker: changeTracker}
changeTracker: changeTracker,
}
if !interleaved {
if err = ldb.buildCoverage(); err != nil {
@ -109,18 +108,6 @@ func (ldb *LevelDBBackend) buildCoverage() error {
return nil
}
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock()
f(ldb.db)
ldb.mutex.RUnlock()
}
func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
ldb.mutex.Lock()
f(ldb.db)
ldb.mutex.Unlock()
}
func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil
}
@ -144,23 +131,21 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doWrite(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
data, err = ldbs.backend.db.Get(ro, key)
if err != nil {
return
}
if data == nil {
success = false
return
}
success = true
wo := leveldb.NewWriteOptions()
defer wo.Close()
err = ldbs.backend.db.Delete(wo, key)
})
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
data, err = ldbs.backend.db.Get(ro, key)
if err != nil {
return
}
if data == nil {
success = false
return
}
success = true
wo := leveldb.NewWriteOptions()
defer wo.Close()
err = ldbs.backend.db.Delete(wo, key)
return
}
@ -168,16 +153,14 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doRead(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
//if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
})
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
//if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
return
}
@ -201,21 +184,20 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doWrite(func(db *leveldb.DB) {
if exists, err = keyExists(db, key); err != nil {
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
return
}
wo := leveldb.NewWriteOptions()
err = db.Put(wo, key, value)
wo.Close()
})
if err != nil {
if exists, err = keyExists(ldbs.backend.db, key); err != nil {
return
}
if ldbs.tx != nil {
ldbs.tx.Put(key, value)
return
} else {
wo := leveldb.NewWriteOptions()
err = ldbs.backend.db.Put(wo, key, value)
wo.Close()
if err != nil {
return
}
}
// This technically too early because this is done in a transactions
// which are commited (and possible fail) later.
if ldbs.backend.changeTracker != nil || ldbs.backend.coverage != nil {
@ -245,13 +227,11 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return
}
ldbs.tx = nil
ldbs.backend.doWrite(func(db *leveldb.DB) {
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = db.Write(wo, tx)
wo.Close()
tx.Close()
})
wo := leveldb.NewWriteOptions()
wo.SetSync(true)
err = ldbs.backend.db.Write(wo, tx)
wo.Close()
tx.Close()
return
}
@ -259,8 +239,6 @@ func (ldbs *LevelDBSession) AllKeys(
hash []byte,
done <-chan struct{}) (<-chan []byte, int, error) {
ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions()
ro.SetFillCache(false)
@ -274,14 +252,12 @@ func (ldbs *LevelDBSession) AllKeys(
if err := it.GetError(); err != nil {
it.Close()
ro.Close()
ldbs.backend.mutex.RUnlock()
return nil, n, err
}
keys := make(chan []byte)
go func() {
ldbs.backend.mutex.RUnlock()
defer ro.Close()
defer close(keys)
defer it.Close()
@ -340,8 +316,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
go func() {
defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions()
defer ro.Close()
@ -402,8 +376,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
go func() {
defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions()
defer ro.Close()