From 65119c1ee4985cd695e9a6ece3825f18864255ef Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 21 Mar 2017 16:25:58 +0100 Subject: [PATCH] Priotize again. --- cmd/mtredisalize/leveldb.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 591afbd..1e5abd6 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -6,6 +6,7 @@ package main import ( "log" + "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" @@ -21,6 +22,9 @@ type LevelDBBackend struct { decoder common.KeyTranscoder changeTracker *changeTracker + + priorityCond *sync.Cond + priority int } type LevelDBSession struct { @@ -71,6 +75,7 @@ func NewLeveDBBackend( encoder: encoder, decoder: decoder, changeTracker: changeTracker, + priorityCond: sync.NewCond(new(sync.Mutex)), } if !interleaved { @@ -83,6 +88,28 @@ func NewLeveDBBackend( return } +func (ldb *LevelDBBackend) suspend() { + ldb.priorityCond.L.Lock() + for ldb.priority > 0 { + ldb.priorityCond.Wait() + } + ldb.priorityCond.L.Unlock() +} + +func (ldb *LevelDBBackend) grab() { + ldb.priorityCond.L.Lock() + ldb.priority++ + ldb.priorityCond.L.Unlock() +} + +func (ldb *LevelDBBackend) ungrab() { + ldb.priorityCond.L.Lock() + if ldb.priority--; ldb.priority <= 0 { + ldb.priorityCond.Broadcast() + } + ldb.priorityCond.L.Unlock() +} + func (ldb *LevelDBBackend) buildCoverage() error { log.Println("INFO: Start building coverage index (this may take some time)...") @@ -153,6 +180,8 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } + ldbs.backend.grab() + defer ldbs.backend.ungrab() ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { @@ -184,6 +213,8 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if key, err = ldbs.backend.decoder(key); err != nil { return } + ldbs.backend.grab() + defer ldbs.backend.ungrab() if exists, err = keyExists(ldbs.backend.db, key); err != nil { return } @@ -325,6 +356,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery( for _, r := range ldbs.backend.coverage.Query(c1, c2) { a.Z, b.Z = int16(r.Z), int16(r.Z) a.X, b.X = int16(r.X1), int16(r.X2) + ldbs.backend.suspend() for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- { b.Y = a.Y // The keys in the database are stored and ordered as strings @@ -415,6 +447,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( it.Next() } else { next := common.BigMin(zmin, zmax, zcode) + ldbs.backend.suspend() //log.Printf("seeking to: %d\n", next) it.Seek(common.ToBigEndian(next)) //log.Printf("seeking done: %d\n", next)