From aad612d097bb2377369aa20e55ccc3250599a929 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Mon, 20 Mar 2017 12:41:24 +0100 Subject: [PATCH 1/5] Experimental increased priorities for typical mt-server ops. --- cmd/mtredisalize/leveldb.go | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index f218ba4..1e9f1a5 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -23,6 +23,9 @@ type LevelDBBackend struct { changeTracker *changeTracker mutex sync.RWMutex + + priority *sync.Cond + priorityCount int } type LevelDBSession struct { @@ -72,7 +75,8 @@ func NewLeveDBBackend( interleaved: interleaved, encoder: encoder, decoder: decoder, - changeTracker: changeTracker} + changeTracker: changeTracker, + priority: sync.NewCond(new(sync.Mutex))} if !interleaved { if err = ldb.buildCoverage(); err != nil { @@ -84,6 +88,28 @@ func NewLeveDBBackend( return } +func (ldb *LevelDBBackend) suspend() { + ldb.priority.L.Lock() + for ldb.priorityCount > 0 { + ldb.priority.Wait() + } + ldb.priority.L.Unlock() +} + +func (ldb *LevelDBBackend) grab() { + ldb.priority.L.Lock() + ldb.priorityCount++ + ldb.priority.L.Unlock() +} + +func (ldb *LevelDBBackend) ungrab() { + ldb.priority.L.Lock() + if ldb.priorityCount--; ldb.priorityCount <= 0 { + ldb.priority.Broadcast() + } + ldb.priority.L.Unlock() +} + func (ldb *LevelDBBackend) buildCoverage() error { log.Println("INFO: Start building coverage index (this may take some time)...") @@ -169,6 +195,9 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { return } ldbs.backend.doRead(func(db *leveldb.DB) { + ldbs.backend.grab() + defer ldbs.backend.ungrab() + ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { @@ -202,6 +231,8 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err return } ldbs.backend.doWrite(func(db *leveldb.DB) { + ldbs.backend.grab() + defer ldbs.backend.ungrab() if exists, err = keyExists(db, key); err != nil { return } @@ -424,6 +455,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( //log.Printf("seeking to: %d\n", zmin) it.Seek(common.ToBigEndian(zmin)) for it.Valid() { + ldbs.backend.suspend() zcode := common.FromBigEndian(it.Key()) if zcode > zmax { From e4ad3a84d81aaa12b7c74d7a8ee11cfbeb06f9fd Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 21 Mar 2017 08:40:44 +0100 Subject: [PATCH 2/5] Removed locking from LevelDB entirely. It is not needed because LevelDB itself handles concurreny issues and it causes a lot of contentions and massive lags. --- cmd/mtredisalize/leveldb.go | 139 ++++++++++-------------------------- 1 file changed, 39 insertions(+), 100 deletions(-) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 1e9f1a5..c4257e2 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -6,7 +6,6 @@ package main import ( "log" - "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" @@ -22,10 +21,6 @@ type LevelDBBackend struct { decoder common.KeyTranscoder changeTracker *changeTracker - mutex sync.RWMutex - - priority *sync.Cond - priorityCount int } type LevelDBSession struct { @@ -76,7 +71,7 @@ func NewLeveDBBackend( encoder: encoder, decoder: decoder, changeTracker: changeTracker, - priority: sync.NewCond(new(sync.Mutex))} + } if !interleaved { if err = ldb.buildCoverage(); err != nil { @@ -88,28 +83,6 @@ func NewLeveDBBackend( return } -func (ldb *LevelDBBackend) suspend() { - ldb.priority.L.Lock() - for ldb.priorityCount > 0 { - ldb.priority.Wait() - } - ldb.priority.L.Unlock() -} - -func (ldb *LevelDBBackend) grab() { - ldb.priority.L.Lock() - ldb.priorityCount++ - ldb.priority.L.Unlock() -} - -func (ldb *LevelDBBackend) ungrab() { - ldb.priority.L.Lock() - if ldb.priorityCount--; ldb.priorityCount <= 0 { - ldb.priority.Broadcast() - } - ldb.priority.L.Unlock() -} - func (ldb *LevelDBBackend) buildCoverage() error { log.Println("INFO: Start building coverage index (this may take some time)...") @@ -135,18 +108,6 @@ func (ldb *LevelDBBackend) buildCoverage() error { return nil } -func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { - ldb.mutex.RLock() - f(ldb.db) - ldb.mutex.RUnlock() -} - -func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { - ldb.mutex.Lock() - f(ldb.db) - ldb.mutex.Unlock() -} - func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } @@ -170,23 +131,21 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doWrite(func(db *leveldb.DB) { - ro := leveldb.NewReadOptions() - defer ro.Close() - var data []byte - data, err = ldbs.backend.db.Get(ro, key) - if err != nil { - return - } - if data == nil { - success = false - return - } - success = true - wo := leveldb.NewWriteOptions() - defer wo.Close() - err = ldbs.backend.db.Delete(wo, key) - }) + ro := leveldb.NewReadOptions() + defer ro.Close() + var data []byte + data, err = ldbs.backend.db.Get(ro, key) + if err != nil { + return + } + if data == nil { + success = false + return + } + success = true + wo := leveldb.NewWriteOptions() + defer wo.Close() + err = ldbs.backend.db.Delete(wo, key) return } @@ -194,19 +153,14 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doRead(func(db *leveldb.DB) { - ldbs.backend.grab() - defer ldbs.backend.ungrab() - - ro := leveldb.NewReadOptions() - value, err = ldbs.backend.db.Get(ro, key) - //if err != nil { - // log.Printf("Fetch key '%s' failed.\n", key) - //} else { - // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) - //} - ro.Close() - }) + ro := leveldb.NewReadOptions() + value, err = ldbs.backend.db.Get(ro, key) + //if err != nil { + // log.Printf("Fetch key '%s' failed.\n", key) + //} else { + // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) + //} + ro.Close() return } @@ -230,20 +184,16 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doWrite(func(db *leveldb.DB) { - ldbs.backend.grab() - defer ldbs.backend.ungrab() - if exists, err = keyExists(db, key); err != nil { - return - } - if ldbs.tx != nil { - ldbs.tx.Put(key, value) - return - } - wo := leveldb.NewWriteOptions() - err = db.Put(wo, key, value) - wo.Close() - }) + if exists, err = keyExists(ldbs.backend.db, key); err != nil { + return + } + if ldbs.tx != nil { + ldbs.tx.Put(key, value) + return + } + wo := leveldb.NewWriteOptions() + err = ldbs.backend.db.Put(wo, key, value) + wo.Close() if err != nil { return } @@ -276,13 +226,11 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) { return } ldbs.tx = nil - ldbs.backend.doWrite(func(db *leveldb.DB) { - wo := leveldb.NewWriteOptions() - wo.SetSync(true) - err = db.Write(wo, tx) - wo.Close() - tx.Close() - }) + wo := leveldb.NewWriteOptions() + wo.SetSync(true) + err = ldbs.backend.db.Write(wo, tx) + wo.Close() + tx.Close() return } @@ -290,8 +238,6 @@ func (ldbs *LevelDBSession) AllKeys( hash []byte, done <-chan struct{}) (<-chan []byte, int, error) { - ldbs.backend.mutex.RLock() - ro := leveldb.NewReadOptions() ro.SetFillCache(false) @@ -305,14 +251,12 @@ func (ldbs *LevelDBSession) AllKeys( if err := it.GetError(); err != nil { it.Close() ro.Close() - ldbs.backend.mutex.RUnlock() return nil, n, err } keys := make(chan []byte) go func() { - ldbs.backend.mutex.RUnlock() defer ro.Close() defer close(keys) defer it.Close() @@ -371,8 +315,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery( go func() { defer close(blocks) - ldbs.backend.mutex.RLock() - defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close() @@ -433,8 +375,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( go func() { defer close(blocks) - ldbs.backend.mutex.RLock() - defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close() @@ -455,7 +395,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( //log.Printf("seeking to: %d\n", zmin) it.Seek(common.ToBigEndian(zmin)) for it.Valid() { - ldbs.backend.suspend() zcode := common.FromBigEndian(it.Key()) if zcode > zmax { From 54a4527fdab603f0a740520f1f73fc1652892026 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 21 Mar 2017 16:05:59 +0100 Subject: [PATCH 3/5] Fixed problem raised from the removal of the doWrite closure. --- cmd/mtredisalize/leveldb.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index c4257e2..591afbd 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -190,12 +190,13 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if ldbs.tx != nil { ldbs.tx.Put(key, value) return - } - wo := leveldb.NewWriteOptions() - err = ldbs.backend.db.Put(wo, key, value) - wo.Close() - if err != nil { - return + } else { + wo := leveldb.NewWriteOptions() + err = ldbs.backend.db.Put(wo, key, value) + wo.Close() + if err != nil { + return + } } // This technically too early because this is done in a transactions // which are commited (and possible fail) later. From 65119c1ee4985cd695e9a6ece3825f18864255ef Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 21 Mar 2017 16:25:58 +0100 Subject: [PATCH 4/5] Priotize again. --- cmd/mtredisalize/leveldb.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 591afbd..1e5abd6 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -6,6 +6,7 @@ package main import ( "log" + "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" @@ -21,6 +22,9 @@ type LevelDBBackend struct { decoder common.KeyTranscoder changeTracker *changeTracker + + priorityCond *sync.Cond + priority int } type LevelDBSession struct { @@ -71,6 +75,7 @@ func NewLeveDBBackend( encoder: encoder, decoder: decoder, changeTracker: changeTracker, + priorityCond: sync.NewCond(new(sync.Mutex)), } if !interleaved { @@ -83,6 +88,28 @@ func NewLeveDBBackend( return } +func (ldb *LevelDBBackend) suspend() { + ldb.priorityCond.L.Lock() + for ldb.priority > 0 { + ldb.priorityCond.Wait() + } + ldb.priorityCond.L.Unlock() +} + +func (ldb *LevelDBBackend) grab() { + ldb.priorityCond.L.Lock() + ldb.priority++ + ldb.priorityCond.L.Unlock() +} + +func (ldb *LevelDBBackend) ungrab() { + ldb.priorityCond.L.Lock() + if ldb.priority--; ldb.priority <= 0 { + ldb.priorityCond.Broadcast() + } + ldb.priorityCond.L.Unlock() +} + func (ldb *LevelDBBackend) buildCoverage() error { log.Println("INFO: Start building coverage index (this may take some time)...") @@ -153,6 +180,8 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } + ldbs.backend.grab() + defer ldbs.backend.ungrab() ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { @@ -184,6 +213,8 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if key, err = ldbs.backend.decoder(key); err != nil { return } + ldbs.backend.grab() + defer ldbs.backend.ungrab() if exists, err = keyExists(ldbs.backend.db, key); err != nil { return } @@ -325,6 +356,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery( for _, r := range ldbs.backend.coverage.Query(c1, c2) { a.Z, b.Z = int16(r.Z), int16(r.Z) a.X, b.X = int16(r.X1), int16(r.X2) + ldbs.backend.suspend() for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- { b.Y = a.Y // The keys in the database are stored and ordered as strings @@ -415,6 +447,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( it.Next() } else { next := common.BigMin(zmin, zmax, zcode) + ldbs.backend.suspend() //log.Printf("seeking to: %d\n", next) it.Seek(common.ToBigEndian(next)) //log.Printf("seeking done: %d\n", next) From c067fc0618923a9e1399973eb2333c38a5b6aa3b Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Sat, 15 Apr 2017 13:17:31 +0200 Subject: [PATCH 5/5] Backed out changeset 2f388b07f19a (no measurable effect) --- cmd/mtredisalize/leveldb.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 1e5abd6..591afbd 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -6,7 +6,6 @@ package main import ( "log" - "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" @@ -22,9 +21,6 @@ type LevelDBBackend struct { decoder common.KeyTranscoder changeTracker *changeTracker - - priorityCond *sync.Cond - priority int } type LevelDBSession struct { @@ -75,7 +71,6 @@ func NewLeveDBBackend( encoder: encoder, decoder: decoder, changeTracker: changeTracker, - priorityCond: sync.NewCond(new(sync.Mutex)), } if !interleaved { @@ -88,28 +83,6 @@ func NewLeveDBBackend( return } -func (ldb *LevelDBBackend) suspend() { - ldb.priorityCond.L.Lock() - for ldb.priority > 0 { - ldb.priorityCond.Wait() - } - ldb.priorityCond.L.Unlock() -} - -func (ldb *LevelDBBackend) grab() { - ldb.priorityCond.L.Lock() - ldb.priority++ - ldb.priorityCond.L.Unlock() -} - -func (ldb *LevelDBBackend) ungrab() { - ldb.priorityCond.L.Lock() - if ldb.priority--; ldb.priority <= 0 { - ldb.priorityCond.Broadcast() - } - ldb.priorityCond.L.Unlock() -} - func (ldb *LevelDBBackend) buildCoverage() error { log.Println("INFO: Start building coverage index (this may take some time)...") @@ -180,8 +153,6 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.grab() - defer ldbs.backend.ungrab() ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { @@ -213,8 +184,6 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.grab() - defer ldbs.backend.ungrab() if exists, err = keyExists(ldbs.backend.db, key); err != nil { return } @@ -356,7 +325,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery( for _, r := range ldbs.backend.coverage.Query(c1, c2) { a.Z, b.Z = int16(r.Z), int16(r.Z) a.X, b.X = int16(r.X1), int16(r.X2) - ldbs.backend.suspend() for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- { b.Y = a.Y // The keys in the database are stored and ordered as strings @@ -447,7 +415,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( it.Next() } else { next := common.BigMin(zmin, zmax, zcode) - ldbs.backend.suspend() //log.Printf("seeking to: %d\n", next) it.Seek(common.ToBigEndian(next)) //log.Printf("seeking done: %d\n", next)