Priotize again.

This commit is contained in:
Sascha L. Teichmann 2017-03-21 16:25:58 +01:00
parent 54a4527fda
commit 65119c1ee4

View File

@ -6,6 +6,7 @@ package main
import (
"log"
"sync"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
@ -21,6 +22,9 @@ type LevelDBBackend struct {
decoder common.KeyTranscoder
changeTracker *changeTracker
priorityCond *sync.Cond
priority int
}
type LevelDBSession struct {
@ -71,6 +75,7 @@ func NewLeveDBBackend(
encoder: encoder,
decoder: decoder,
changeTracker: changeTracker,
priorityCond: sync.NewCond(new(sync.Mutex)),
}
if !interleaved {
@ -83,6 +88,28 @@ func NewLeveDBBackend(
return
}
func (ldb *LevelDBBackend) suspend() {
ldb.priorityCond.L.Lock()
for ldb.priority > 0 {
ldb.priorityCond.Wait()
}
ldb.priorityCond.L.Unlock()
}
func (ldb *LevelDBBackend) grab() {
ldb.priorityCond.L.Lock()
ldb.priority++
ldb.priorityCond.L.Unlock()
}
func (ldb *LevelDBBackend) ungrab() {
ldb.priorityCond.L.Lock()
if ldb.priority--; ldb.priority <= 0 {
ldb.priorityCond.Broadcast()
}
ldb.priorityCond.L.Unlock()
}
func (ldb *LevelDBBackend) buildCoverage() error {
log.Println("INFO: Start building coverage index (this may take some time)...")
@ -153,6 +180,8 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.grab()
defer ldbs.backend.ungrab()
ro := leveldb.NewReadOptions()
value, err = ldbs.backend.db.Get(ro, key)
//if err != nil {
@ -184,6 +213,8 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.grab()
defer ldbs.backend.ungrab()
if exists, err = keyExists(ldbs.backend.db, key); err != nil {
return
}
@ -325,6 +356,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
for _, r := range ldbs.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
a.X, b.X = int16(r.X1), int16(r.X2)
ldbs.backend.suspend()
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
b.Y = a.Y
// The keys in the database are stored and ordered as strings
@ -415,6 +447,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
it.Next()
} else {
next := common.BigMin(zmin, zmax, zcode)
ldbs.backend.suspend()
//log.Printf("seeking to: %d\n", next)
it.Seek(common.ToBigEndian(next))
//log.Printf("seeking done: %d\n", next)