From 60f43c9f526488501e4cbf104f8915830e754056 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 26 Jul 2016 16:15:10 +0200 Subject: [PATCH 1/3] Implementation of Redis command HDEL. Needs testing. For issue #18. --- cmd/mtredisalize/backend.go | 2 ++ cmd/mtredisalize/connection.go | 27 +++++++++++++++++++---- cmd/mtredisalize/leveldb.go | 24 +++++++++++++++++++++ cmd/mtredisalize/redisparser.go | 13 +++++++++++ cmd/mtredisalize/sqlite.go | 38 +++++++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 4 deletions(-) diff --git a/cmd/mtredisalize/backend.go b/cmd/mtredisalize/backend.go index e4a84b1..efc076e 100644 --- a/cmd/mtredisalize/backend.go +++ b/cmd/mtredisalize/backend.go @@ -15,6 +15,8 @@ type ( // Session is a database session. Session interface { + // Del deletes a block by a given key. + Del(hash, key []byte) (bool, error) // Fetch fetches the block data for a given position. Fetch(hash, key []byte) ([]byte, error) // InTransaction returns true if a transaction is running. diff --git a/cmd/mtredisalize/connection.go b/cmd/mtredisalize/connection.go index e03a7ba..3edb93a 100644 --- a/cmd/mtredisalize/connection.go +++ b/cmd/mtredisalize/connection.go @@ -21,6 +21,8 @@ var ( redisQueued = []byte("+QUEUED\r\n") redisTrue = []byte(":1\r\n") redisFalse = []byte(":0\r\n") + redisZero = []byte("0\r\n") + redisOne = []byte("1\r\n") ) type Connection struct { @@ -57,6 +59,20 @@ func logError(err error) bool { return true } +func (c *Connection) Hdel(hash, key []byte) bool { + + success, err := c.session.Del(hash, key) + if err != nil { + return c.writeError(err) + } + + if success { + return c.writeMessage(redisOne) + } + + return c.writeMessage(redisZero) +} + func (c *Connection) Hget(hash, key []byte) bool { var err error @@ -203,14 +219,17 @@ func (c *Connection) writeBoolArray(arr []bool) bool { return true } -func (c *Connection) writeOk() bool { - _, err := c.conn.Write(redisOk) +func (c *Connection) writeMessage(msg []byte) bool { + _, err := c.conn.Write(msg) return logError(err) } +func (c *Connection) writeOk() bool { + return c.writeMessage(redisOk) +} + func (c *Connection) writeQueued() bool { - _, err := c.conn.Write(redisQueued) - return logError(err) + return c.writeMessage(redisQueued) } func (c *Connection) writeBlock(data []byte) bool { diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index c01034c..3bffce9 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -140,6 +140,30 @@ func (ldb *LevelDBBackend) Shutdown() error { return nil } +func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) { + if key, err = ldbs.backend.decoder(key); err != nil { + return + } + ldbs.backend.doRead(func(db *leveldb.DB) { + ro := leveldb.NewReadOptions() + defer ro.Close() + var data []byte + data, err = ldbs.backend.db.Get(ro, key) + if err != nil { + return + } + if data == nil { + success = false + return + } + success = true + wo := leveldb.NewWriteOptions() + defer wo.Close() + err = ldbs.backend.db.Delete(wo, key) + }) + return +} + func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return diff --git a/cmd/mtredisalize/redisparser.go b/cmd/mtredisalize/redisparser.go index 940d020..8fbf562 100644 --- a/cmd/mtredisalize/redisparser.go +++ b/cmd/mtredisalize/redisparser.go @@ -15,6 +15,7 @@ import ( ) type RedisCommands interface { + Hdel(hash, key []byte) bool Hget(hash, key []byte) bool Hset(hash, key, block []byte) bool Multi() bool @@ -161,6 +162,18 @@ func (rp *RedisParser) execute() bool { } cmd := strings.ToUpper(asString(rp.args[0])) switch cmd { + case "HDEL": + if l < 3 { + log.Println("WARN: Missing argments for HGET.") + return false + } + hash, ok1 := rp.args[1].([]byte) + key, ok2 := rp.args[2].([]byte) + if !ok1 || !ok2 { + log.Println("WARN: HDEL data are not byte slices.") + return false + } + return rp.commands.Hdel(hash, key) case "HGET": if l < 3 { log.Println("WARN: Missing argments for HGET.") diff --git a/cmd/mtredisalize/sqlite.go b/cmd/mtredisalize/sqlite.go index 1be493d..1f008d8 100644 --- a/cmd/mtredisalize/sqlite.go +++ b/cmd/mtredisalize/sqlite.go @@ -17,6 +17,7 @@ import ( var globalLock sync.RWMutex const ( + deleteSQL = "DELETE FROM blocks WHERE pos = ?" fetchSQL = "SELECT data FROM blocks WHERE pos = ?" existsSQL = "SELECT 1 FROM blocks WHERE pos = ?" updateSQL = "UPDATE blocks SET data = ? WHERE pos = ?" @@ -34,6 +35,7 @@ type SQLiteBackend struct { interleaved bool coverage *common.Coverage3D existsStmt *sql.Stmt + deleteStmt *sql.Stmt fetchStmt *sql.Stmt insertStmt *sql.Stmt updateStmt *sql.Stmt @@ -80,6 +82,11 @@ func NewSQLiteBackend( return } + if res.deleteStmt, err = res.db.Prepare(deleteSQL); err != nil { + res.closeAll() + return + } + if res.insertStmt, err = res.db.Prepare(insertSQL); err != nil { res.closeAll() return @@ -164,6 +171,7 @@ func closeDB(db **sql.DB) error { } func (sqlb *SQLiteBackend) closeAll() error { + closeStmt(&sqlb.deleteStmt) closeStmt(&sqlb.fetchStmt) closeStmt(&sqlb.insertStmt) closeStmt(&sqlb.updateStmt) @@ -188,6 +196,36 @@ func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt { return stmt } +func (ss *SQLiteSession) Del(hash, key []byte) (success bool, err error) { + var pos int64 + if pos, err = ss.backend.decoder(key); err != nil { + return + } + + globalLock.Lock() + defer globalLock.Unlock() + + existsStmt := ss.txStmt(ss.backend.existsStmt) + var x int + err2 := existsStmt.QueryRow(pos).Scan(&x) + + if err2 == sql.ErrNoRows { + success = false + return + } + if err2 != nil { + err = err2 + return + } + + success = true + + deleteStmt := ss.txStmt(ss.backend.deleteStmt) + _, err = deleteStmt.Exec(pos) + + return +} + func (ss *SQLiteSession) Fetch(hash, key []byte) (data []byte, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { From d6ddd047a47df11b50704d88ac39c00013e54768 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Tue, 26 Jul 2016 16:32:24 +0200 Subject: [PATCH 2/3] Fixed and simplified redis network code. --- cmd/mtredisalize/connection.go | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/cmd/mtredisalize/connection.go b/cmd/mtredisalize/connection.go index 3edb93a..7b0cc83 100644 --- a/cmd/mtredisalize/connection.go +++ b/cmd/mtredisalize/connection.go @@ -21,8 +21,6 @@ var ( redisQueued = []byte("+QUEUED\r\n") redisTrue = []byte(":1\r\n") redisFalse = []byte(":0\r\n") - redisZero = []byte("0\r\n") - redisOne = []byte("1\r\n") ) type Connection struct { @@ -66,11 +64,7 @@ func (c *Connection) Hdel(hash, key []byte) bool { return c.writeError(err) } - if success { - return c.writeMessage(redisOne) - } - - return c.writeMessage(redisZero) + return c.writeBool(success) } func (c *Connection) Hget(hash, key []byte) bool { @@ -153,8 +147,7 @@ func (c *Connection) Hkeys(hash []byte) bool { } func (c *Connection) Ping() bool { - _, err := c.conn.Write(redisPong) - return logError(err) + return c.writeMessage(redisPong) } func (c *Connection) HSpatial(hash, first, second []byte) bool { @@ -183,23 +176,18 @@ func (c *Connection) HSpatial(hash, first, second []byte) bool { func (c *Connection) writeError(err error) bool { logError(err) - _, err = c.conn.Write(redisError) - return logError(err) + return c.writeMessage(redisError) } func (c *Connection) writeEmptyArray() bool { - _, err := c.conn.Write(redisEmptyArray) - return logError(err) + return c.writeMessage(redisEmptyArray) } func (c *Connection) writeBool(b bool) bool { - var err error if b { - _, err = c.conn.Write(redisTrue) - } else { - _, err = c.conn.Write(redisFalse) + return c.writeMessage(redisTrue) } - return logError(err) + return c.writeMessage(redisFalse) } func redisLength(prefix byte, s int) []byte { From 6b5f791a8ac938095cf1334b9090abed2996dbc5 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Wed, 27 Jul 2016 16:38:54 +0200 Subject: [PATCH 3/3] Use write lock when executing block deletion. --- cmd/mtredisalize/leveldb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 3bffce9..f218ba4 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -144,7 +144,7 @@ func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } - ldbs.backend.doRead(func(db *leveldb.DB) { + ldbs.backend.doWrite(func(db *leveldb.DB) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte