diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index f218ba4..591afbd 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -6,7 +6,6 @@ package main import ( "log" - "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" @@ -22,7 +21,6 @@ type LevelDBBackend struct { decoder common.KeyTranscoder changeTracker *changeTracker - mutex sync.RWMutex } type LevelDBSession struct { @@ -72,7 +70,8 @@ func NewLeveDBBackend( interleaved: interleaved, encoder: encoder, decoder: decoder, - changeTracker: changeTracker} + changeTracker: changeTracker, + } if !interleaved { if err = ldb.buildCoverage(); err != nil { @@ -109,18 +108,6 @@ func (ldb *LevelDBBackend) buildCoverage() error { return nil } -func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { - ldb.mutex.RLock() - f(ldb.db) - ldb.mutex.RUnlock() -} - -func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { - ldb.mutex.Lock() - f(ldb.db) - ldb.mutex.Unlock() -} - func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } @@ -144,23 +131,21 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doWrite(func(db *leveldb.DB) { - ro := leveldb.NewReadOptions() - defer ro.Close() - var data []byte - data, err = ldbs.backend.db.Get(ro, key) - if err != nil { - return - } - if data == nil { - success = false - return - } - success = true - wo := leveldb.NewWriteOptions() - defer wo.Close() - err = ldbs.backend.db.Delete(wo, key) - }) + ro := leveldb.NewReadOptions() + defer ro.Close() + var data []byte + data, err = ldbs.backend.db.Get(ro, key) + if err != nil { + return + } + if data == nil { + success = false + return + } + success = true + wo := leveldb.NewWriteOptions() + defer wo.Close() + err = ldbs.backend.db.Delete(wo, key) return } @@ -168,16 +153,14 @@ func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doRead(func(db *leveldb.DB) { - ro := leveldb.NewReadOptions() - value, err = ldbs.backend.db.Get(ro, key) - //if err != nil { - // log.Printf("Fetch key '%s' failed.\n", key) - //} else { - // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) - //} - ro.Close() - }) + ro := leveldb.NewReadOptions() + value, err = ldbs.backend.db.Get(ro, key) + //if err != nil { + // log.Printf("Fetch key '%s' failed.\n", key) + //} else { + // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) + //} + ro.Close() return } @@ -201,21 +184,20 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doWrite(func(db *leveldb.DB) { - if exists, err = keyExists(db, key); err != nil { - return - } - if ldbs.tx != nil { - ldbs.tx.Put(key, value) - return - } - wo := leveldb.NewWriteOptions() - err = db.Put(wo, key, value) - wo.Close() - }) - if err != nil { + if exists, err = keyExists(ldbs.backend.db, key); err != nil { return } + if ldbs.tx != nil { + ldbs.tx.Put(key, value) + return + } else { + wo := leveldb.NewWriteOptions() + err = ldbs.backend.db.Put(wo, key, value) + wo.Close() + if err != nil { + return + } + } // This technically too early because this is done in a transactions // which are commited (and possible fail) later. if ldbs.backend.changeTracker != nil || ldbs.backend.coverage != nil { @@ -245,13 +227,11 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) { return } ldbs.tx = nil - ldbs.backend.doWrite(func(db *leveldb.DB) { - wo := leveldb.NewWriteOptions() - wo.SetSync(true) - err = db.Write(wo, tx) - wo.Close() - tx.Close() - }) + wo := leveldb.NewWriteOptions() + wo.SetSync(true) + err = ldbs.backend.db.Write(wo, tx) + wo.Close() + tx.Close() return } @@ -259,8 +239,6 @@ func (ldbs *LevelDBSession) AllKeys( hash []byte, done <-chan struct{}) (<-chan []byte, int, error) { - ldbs.backend.mutex.RLock() - ro := leveldb.NewReadOptions() ro.SetFillCache(false) @@ -274,14 +252,12 @@ func (ldbs *LevelDBSession) AllKeys( if err := it.GetError(); err != nil { it.Close() ro.Close() - ldbs.backend.mutex.RUnlock() return nil, n, err } keys := make(chan []byte) go func() { - ldbs.backend.mutex.RUnlock() defer ro.Close() defer close(keys) defer it.Close() @@ -340,8 +316,6 @@ func (ldbs *LevelDBSession) plainSpatialQuery( go func() { defer close(blocks) - ldbs.backend.mutex.RLock() - defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close() @@ -402,8 +376,6 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( go func() { defer close(blocks) - ldbs.backend.mutex.RLock() - defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close()