From e4ad3a84d81aaa12b7c74d7a8ee11cfbeb06f9fd Mon Sep 17 00:00:00 2001
From: "Sascha L. Teichmann" <sascha.teichmann@intevation.de>
Date: Tue, 21 Mar 2017 08:40:44 +0100
Subject: [PATCH] Removed locking from LevelDB entirely. It is not needed
 because LevelDB itself handles concurreny issues and it causes a lot of
 contentions and massive lags.

---
 cmd/mtredisalize/leveldb.go | 139 ++++++++++--------------------------
 1 file changed, 39 insertions(+), 100 deletions(-)

diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go
index 1e9f1a5..c4257e2 100644
--- a/cmd/mtredisalize/leveldb.go
+++ b/cmd/mtredisalize/leveldb.go
@@ -6,7 +6,6 @@ package main
 
 import (
 	"log"
-	"sync"
 
 	"bitbucket.org/s_l_teichmann/mtsatellite/common"
 
@@ -22,10 +21,6 @@ type LevelDBBackend struct {
 	decoder     common.KeyTranscoder
 
 	changeTracker *changeTracker
-	mutex         sync.RWMutex
-
-	priority      *sync.Cond
-	priorityCount int
 }
 
 type LevelDBSession struct {
@@ -76,7 +71,7 @@ func NewLeveDBBackend(
 		encoder:       encoder,
 		decoder:       decoder,
 		changeTracker: changeTracker,
-		priority:      sync.NewCond(new(sync.Mutex))}
+	}
 
 	if !interleaved {
 		if err = ldb.buildCoverage(); err != nil {
@@ -88,28 +83,6 @@ func NewLeveDBBackend(
 	return
 }
 
-func (ldb *LevelDBBackend) suspend() {
-	ldb.priority.L.Lock()
-	for ldb.priorityCount > 0 {
-		ldb.priority.Wait()
-	}
-	ldb.priority.L.Unlock()
-}
-
-func (ldb *LevelDBBackend) grab() {
-	ldb.priority.L.Lock()
-	ldb.priorityCount++
-	ldb.priority.L.Unlock()
-}
-
-func (ldb *LevelDBBackend) ungrab() {
-	ldb.priority.L.Lock()
-	if ldb.priorityCount--; ldb.priorityCount <= 0 {
-		ldb.priority.Broadcast()
-	}
-	ldb.priority.L.Unlock()
-}
-
 func (ldb *LevelDBBackend) buildCoverage() error {
 	log.Println("INFO: Start building coverage index (this may take some time)...")
 
@@ -135,18 +108,6 @@ func (ldb *LevelDBBackend) buildCoverage() error {
 	return nil
 }
 
-func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
-	ldb.mutex.RLock()
-	f(ldb.db)
-	ldb.mutex.RUnlock()
-}
-
-func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) {
-	ldb.mutex.Lock()
-	f(ldb.db)
-	ldb.mutex.Unlock()
-}
-
 func (ldb *LevelDBBackend) NewSession() (Session, error) {
 	return &LevelDBSession{ldb, nil}, nil
 }
@@ -170,23 +131,21 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) {
 	if key, err = ldbs.backend.decoder(key); err != nil {
 		return
 	}
-	ldbs.backend.doWrite(func(db *leveldb.DB) {
-		ro := leveldb.NewReadOptions()
-		defer ro.Close()
-		var data []byte
-		data, err = ldbs.backend.db.Get(ro, key)
-		if err != nil {
-			return
-		}
-		if data == nil {
-			success = false
-			return
-		}
-		success = true
-		wo := leveldb.NewWriteOptions()
-		defer wo.Close()
-		err = ldbs.backend.db.Delete(wo, key)
-	})
+	ro := leveldb.NewReadOptions()
+	defer ro.Close()
+	var data []byte
+	data, err = ldbs.backend.db.Get(ro, key)
+	if err != nil {
+		return
+	}
+	if data == nil {
+		success = false
+		return
+	}
+	success = true
+	wo := leveldb.NewWriteOptions()
+	defer wo.Close()
+	err = ldbs.backend.db.Delete(wo, key)
 	return
 }
 
@@ -194,19 +153,14 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
 	if key, err = ldbs.backend.decoder(key); err != nil {
 		return
 	}
-	ldbs.backend.doRead(func(db *leveldb.DB) {
-		ldbs.backend.grab()
-		defer ldbs.backend.ungrab()
-
-		ro := leveldb.NewReadOptions()
-		value, err = ldbs.backend.db.Get(ro, key)
-		//if err != nil {
-		//	log.Printf("Fetch key '%s' failed.\n", key)
-		//} else {
-		//  log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
-		//}
-		ro.Close()
-	})
+	ro := leveldb.NewReadOptions()
+	value, err = ldbs.backend.db.Get(ro, key)
+	//if err != nil {
+	//	log.Printf("Fetch key '%s' failed.\n", key)
+	//} else {
+	//  log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value))
+	//}
+	ro.Close()
 	return
 }
 
@@ -230,20 +184,16 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
 	if key, err = ldbs.backend.decoder(key); err != nil {
 		return
 	}
-	ldbs.backend.doWrite(func(db *leveldb.DB) {
-		ldbs.backend.grab()
-		defer ldbs.backend.ungrab()
-		if exists, err = keyExists(db, key); err != nil {
-			return
-		}
-		if ldbs.tx != nil {
-			ldbs.tx.Put(key, value)
-			return
-		}
-		wo := leveldb.NewWriteOptions()
-		err = db.Put(wo, key, value)
-		wo.Close()
-	})
+	if exists, err = keyExists(ldbs.backend.db, key); err != nil {
+		return
+	}
+	if ldbs.tx != nil {
+		ldbs.tx.Put(key, value)
+		return
+	}
+	wo := leveldb.NewWriteOptions()
+	err = ldbs.backend.db.Put(wo, key, value)
+	wo.Close()
 	if err != nil {
 		return
 	}
@@ -276,13 +226,11 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
 		return
 	}
 	ldbs.tx = nil
-	ldbs.backend.doWrite(func(db *leveldb.DB) {
-		wo := leveldb.NewWriteOptions()
-		wo.SetSync(true)
-		err = db.Write(wo, tx)
-		wo.Close()
-		tx.Close()
-	})
+	wo := leveldb.NewWriteOptions()
+	wo.SetSync(true)
+	err = ldbs.backend.db.Write(wo, tx)
+	wo.Close()
+	tx.Close()
 	return
 }
 
@@ -290,8 +238,6 @@ func (ldbs *LevelDBSession) AllKeys(
 	hash []byte,
 	done <-chan struct{}) (<-chan []byte, int, error) {
 
-	ldbs.backend.mutex.RLock()
-
 	ro := leveldb.NewReadOptions()
 	ro.SetFillCache(false)
 
@@ -305,14 +251,12 @@ func (ldbs *LevelDBSession) AllKeys(
 	if err := it.GetError(); err != nil {
 		it.Close()
 		ro.Close()
-		ldbs.backend.mutex.RUnlock()
 		return nil, n, err
 	}
 
 	keys := make(chan []byte)
 
 	go func() {
-		ldbs.backend.mutex.RUnlock()
 		defer ro.Close()
 		defer close(keys)
 		defer it.Close()
@@ -371,8 +315,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
 
 	go func() {
 		defer close(blocks)
-		ldbs.backend.mutex.RLock()
-		defer ldbs.backend.mutex.RUnlock()
 
 		ro := leveldb.NewReadOptions()
 		defer ro.Close()
@@ -433,8 +375,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
 
 	go func() {
 		defer close(blocks)
-		ldbs.backend.mutex.RLock()
-		defer ldbs.backend.mutex.RUnlock()
 
 		ro := leveldb.NewReadOptions()
 		defer ro.Close()
@@ -455,7 +395,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
 		//log.Printf("seeking to: %d\n", zmin)
 		it.Seek(common.ToBigEndian(zmin))
 		for it.Valid() {
-			ldbs.backend.suspend()
 			zcode := common.FromBigEndian(it.Key())
 
 			if zcode > zmax {