Removed locking from LevelDB entirely.

It is not needed because LevelDB itself handles concurreny issues
and it causes a lot of contentions and massive lags.
This commit is contained in:
Sascha L. Teichmann 2017-03-21 08:40:44 +01:00
parent aad612d097
commit e4ad3a84d8

View File

@ -6,7 +6,6 @@ package main
import ( import (
"log" "log"
"sync"
"bitbucket.org/s_l_teichmann/mtsatellite/common" "bitbucket.org/s_l_teichmann/mtsatellite/common"
@ -22,10 +21,6 @@ type LevelDBBackend struct {
decoder common.KeyTranscoder decoder common.KeyTranscoder
changeTracker *changeTracker changeTracker *changeTracker
mutex sync.RWMutex
priority *sync.Cond
priorityCount int
} }
type LevelDBSession struct { type LevelDBSession struct {
@ -76,7 +71,7 @@ func NewLeveDBBackend(
encoder: encoder, encoder: encoder,
decoder: decoder, decoder: decoder,
changeTracker: changeTracker, changeTracker: changeTracker,
priority: sync.NewCond(new(sync.Mutex))} }
if !interleaved { if !interleaved {
if err = ldb.buildCoverage(); err != nil { if err = ldb.buildCoverage(); err != nil {
@ -88,28 +83,6 @@ func NewLeveDBBackend(
return return
} }
func (ldb *LevelDBBackend) suspend() {
ldb.priority.L.Lock()
for ldb.priorityCount > 0 {
ldb.priority.Wait()
}
ldb.priority.L.Unlock()
}
func (ldb *LevelDBBackend) grab() {
ldb.priority.L.Lock()
ldb.priorityCount++
ldb.priority.L.Unlock()
}
func (ldb *LevelDBBackend) ungrab() {
ldb.priority.L.Lock()
if ldb.priorityCount--; ldb.priorityCount <= 0 {
ldb.priority.Broadcast()
}
ldb.priority.L.Unlock()
}
func (ldb *LevelDBBackend) buildCoverage() error { func (ldb *LevelDBBackend) buildCoverage() error {
log.Println("INFO: Start building coverage index (this may take some time)...") log.Println("INFO: Start building coverage index (this may take some time)...")
@ -135,18 +108,6 @@ func (ldb *LevelDBBackend) buildCoverage() error {
return nil return nil
} }
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock()
f(ldb.db)
ldb.mutex.RUnlock()
}
func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
ldb.mutex.Lock()
f(ldb.db)
ldb.mutex.Unlock()
}
func (ldb *LevelDBBackend) NewSession() (Session, error) { func (ldb *LevelDBBackend) NewSession() (Session, error) {
return &LevelDBSession{ldb, nil}, nil return &LevelDBSession{ldb, nil}, nil
} }
@ -170,23 +131,21 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
ldbs.backend.doWrite(func(db *leveldb.DB) { ro := leveldb.NewReadOptions()
ro := leveldb.NewReadOptions() defer ro.Close()
defer ro.Close() var data []byte
var data []byte data, err = ldbs.backend.db.Get(ro, key)
data, err = ldbs.backend.db.Get(ro, key) if err != nil {
if err != nil { return
return }
} if data == nil {
if data == nil { success = false
success = false return
return }
} success = true
success = true wo := leveldb.NewWriteOptions()
wo := leveldb.NewWriteOptions() defer wo.Close()
defer wo.Close() err = ldbs.backend.db.Delete(wo, key)
err = ldbs.backend.db.Delete(wo, key)
})
return return
} }
@ -194,19 +153,14 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
ldbs.backend.doRead(func(db *leveldb.DB) { ro := leveldb.NewReadOptions()
ldbs.backend.grab() value, err = ldbs.backend.db.Get(ro, key)
defer ldbs.backend.ungrab() //if err != nil {
// log.Printf("Fetch key '%s' failed.\n", key)
ro := leveldb.NewReadOptions() //} else {
value, err = ldbs.backend.db.Get(ro, key) // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//if err != nil { //}
// log.Printf("Fetch key '%s' failed.\n", key) ro.Close()
//} else {
// log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
//}
ro.Close()
})
return return
} }
@ -230,20 +184,16 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
ldbs.backend.doWrite(func(db *leveldb.DB) { if exists, err = keyExists(ldbs.backend.db, key); err != nil {
ldbs.backend.grab() return
defer ldbs.backend.ungrab() }
if exists, err = keyExists(db, key); err != nil { if ldbs.tx != nil {
return ldbs.tx.Put(key, value)
} return
if ldbs.tx != nil { }
ldbs.tx.Put(key, value) wo := leveldb.NewWriteOptions()
return err = ldbs.backend.db.Put(wo, key, value)
} wo.Close()
wo := leveldb.NewWriteOptions()
err = db.Put(wo, key, value)
wo.Close()
})
if err != nil { if err != nil {
return return
} }
@ -276,13 +226,11 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return return
} }
ldbs.tx = nil ldbs.tx = nil
ldbs.backend.doWrite(func(db *leveldb.DB) { wo := leveldb.NewWriteOptions()
wo := leveldb.NewWriteOptions() wo.SetSync(true)
wo.SetSync(true) err = ldbs.backend.db.Write(wo, tx)
err = db.Write(wo, tx) wo.Close()
wo.Close() tx.Close()
tx.Close()
})
return return
} }
@ -290,8 +238,6 @@ func (ldbs *LevelDBSession) AllKeys(
hash []byte, hash []byte,
done <-chan struct{}) (<-chan []byte, int, error) { done <-chan struct{}) (<-chan []byte, int, error) {
ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
ro.SetFillCache(false) ro.SetFillCache(false)
@ -305,14 +251,12 @@ func (ldbs *LevelDBSession) AllKeys(
if err := it.GetError(); err != nil { if err := it.GetError(); err != nil {
it.Close() it.Close()
ro.Close() ro.Close()
ldbs.backend.mutex.RUnlock()
return nil, n, err return nil, n, err
} }
keys := make(chan []byte) keys := make(chan []byte)
go func() { go func() {
ldbs.backend.mutex.RUnlock()
defer ro.Close() defer ro.Close()
defer close(keys) defer close(keys)
defer it.Close() defer it.Close()
@ -371,8 +315,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
go func() { go func() {
defer close(blocks) defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
defer ro.Close() defer ro.Close()
@ -433,8 +375,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
go func() { go func() {
defer close(blocks) defer close(blocks)
ldbs.backend.mutex.RLock()
defer ldbs.backend.mutex.RUnlock()
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
defer ro.Close() defer ro.Close()
@ -455,7 +395,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
//log.Printf("seeking to: %d\n", zmin) //log.Printf("seeking to: %d\n", zmin)
it.Seek(common.ToBigEndian(zmin)) it.Seek(common.ToBigEndian(zmin))
for it.Valid() { for it.Valid() {
ldbs.backend.suspend()
zcode := common.FromBigEndian(it.Key()) zcode := common.FromBigEndian(it.Key())
if zcode > zmax { if zcode > zmax {