Merged hdel-redis-command branch which allows to call /deleteblocks from MT server.

This commit is contained in:
Sascha L. Teichmann 2016-07-28 10:43:20 +02:00
commit 9b6427fa2e
5 changed files with 99 additions and 15 deletions

View File

@ -15,6 +15,8 @@ type (
// Session is a database session. // Session is a database session.
Session interface { Session interface {
// Del deletes a block by a given key.
Del(hash, key []byte) (bool, error)
// Fetch fetches the block data for a given position. // Fetch fetches the block data for a given position.
Fetch(hash, key []byte) ([]byte, error) Fetch(hash, key []byte) ([]byte, error)
// InTransaction returns true if a transaction is running. // InTransaction returns true if a transaction is running.

View File

@ -57,6 +57,16 @@ func logError(err error) bool {
return true return true
} }
func (c *Connection) Hdel(hash, key []byte) bool {
success, err := c.session.Del(hash, key)
if err != nil {
return c.writeError(err)
}
return c.writeBool(success)
}
func (c *Connection) Hget(hash, key []byte) bool { func (c *Connection) Hget(hash, key []byte) bool {
var err error var err error
@ -137,8 +147,7 @@ func (c *Connection) Hkeys(hash []byte) bool {
} }
func (c *Connection) Ping() bool { func (c *Connection) Ping() bool {
_, err := c.conn.Write(redisPong) return c.writeMessage(redisPong)
return logError(err)
} }
func (c *Connection) HSpatial(hash, first, second []byte) bool { func (c *Connection) HSpatial(hash, first, second []byte) bool {
@ -167,23 +176,18 @@ func (c *Connection) HSpatial(hash, first, second []byte) bool {
func (c *Connection) writeError(err error) bool { func (c *Connection) writeError(err error) bool {
logError(err) logError(err)
_, err = c.conn.Write(redisError) return c.writeMessage(redisError)
return logError(err)
} }
func (c *Connection) writeEmptyArray() bool { func (c *Connection) writeEmptyArray() bool {
_, err := c.conn.Write(redisEmptyArray) return c.writeMessage(redisEmptyArray)
return logError(err)
} }
func (c *Connection) writeBool(b bool) bool { func (c *Connection) writeBool(b bool) bool {
var err error
if b { if b {
_, err = c.conn.Write(redisTrue) return c.writeMessage(redisTrue)
} else {
_, err = c.conn.Write(redisFalse)
} }
return logError(err) return c.writeMessage(redisFalse)
} }
func redisLength(prefix byte, s int) []byte { func redisLength(prefix byte, s int) []byte {
@ -203,14 +207,17 @@ func (c *Connection) writeBoolArray(arr []bool) bool {
return true return true
} }
func (c *Connection) writeOk() bool { func (c *Connection) writeMessage(msg []byte) bool {
_, err := c.conn.Write(redisOk) _, err := c.conn.Write(msg)
return logError(err) return logError(err)
} }
func (c *Connection) writeOk() bool {
return c.writeMessage(redisOk)
}
func (c *Connection) writeQueued() bool { func (c *Connection) writeQueued() bool {
_, err := c.conn.Write(redisQueued) return c.writeMessage(redisQueued)
return logError(err)
} }
func (c *Connection) writeBlock(data []byte) bool { func (c *Connection) writeBlock(data []byte) bool {

View File

@ -140,6 +140,30 @@ func (ldb *LevelDBBackend) Shutdown() error {
return nil return nil
} }
func (ldbs *LevelDBSession) Del(hash, key []byte) (success bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil {
return
}
ldbs.backend.doWrite(func(db *leveldb.DB) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
data, err = ldbs.backend.db.Get(ro, key)
if err != nil {
return
}
if data == nil {
success = false
return
}
success = true
wo := leveldb.NewWriteOptions()
defer wo.Close()
err = ldbs.backend.db.Delete(wo, key)
})
return
}
func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return

View File

@ -15,6 +15,7 @@ import (
) )
type RedisCommands interface { type RedisCommands interface {
Hdel(hash, key []byte) bool
Hget(hash, key []byte) bool Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool Hset(hash, key, block []byte) bool
Multi() bool Multi() bool
@ -161,6 +162,18 @@ func (rp *RedisParser) execute() bool {
} }
cmd := strings.ToUpper(asString(rp.args[0])) cmd := strings.ToUpper(asString(rp.args[0]))
switch cmd { switch cmd {
case "HDEL":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HDEL data are not byte slices.")
return false
}
return rp.commands.Hdel(hash, key)
case "HGET": case "HGET":
if l < 3 { if l < 3 {
log.Println("WARN: Missing argments for HGET.") log.Println("WARN: Missing argments for HGET.")

View File

@ -17,6 +17,7 @@ import (
var globalLock sync.RWMutex var globalLock sync.RWMutex
const ( const (
deleteSQL = "DELETE FROM blocks WHERE pos = ?"
fetchSQL = "SELECT data FROM blocks WHERE pos = ?" fetchSQL = "SELECT data FROM blocks WHERE pos = ?"
existsSQL = "SELECT 1 FROM blocks WHERE pos = ?" existsSQL = "SELECT 1 FROM blocks WHERE pos = ?"
updateSQL = "UPDATE blocks SET data = ? WHERE pos = ?" updateSQL = "UPDATE blocks SET data = ? WHERE pos = ?"
@ -34,6 +35,7 @@ type SQLiteBackend struct {
interleaved bool interleaved bool
coverage *common.Coverage3D coverage *common.Coverage3D
existsStmt *sql.Stmt existsStmt *sql.Stmt
deleteStmt *sql.Stmt
fetchStmt *sql.Stmt fetchStmt *sql.Stmt
insertStmt *sql.Stmt insertStmt *sql.Stmt
updateStmt *sql.Stmt updateStmt *sql.Stmt
@ -80,6 +82,11 @@ func NewSQLiteBackend(
return return
} }
if res.deleteStmt, err = res.db.Prepare(deleteSQL); err != nil {
res.closeAll()
return
}
if res.insertStmt, err = res.db.Prepare(insertSQL); err != nil { if res.insertStmt, err = res.db.Prepare(insertSQL); err != nil {
res.closeAll() res.closeAll()
return return
@ -164,6 +171,7 @@ func closeDB(db **sql.DB) error {
} }
func (sqlb *SQLiteBackend) closeAll() error { func (sqlb *SQLiteBackend) closeAll() error {
closeStmt(&sqlb.deleteStmt)
closeStmt(&sqlb.fetchStmt) closeStmt(&sqlb.fetchStmt)
closeStmt(&sqlb.insertStmt) closeStmt(&sqlb.insertStmt)
closeStmt(&sqlb.updateStmt) closeStmt(&sqlb.updateStmt)
@ -188,6 +196,36 @@ func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
return stmt return stmt
} }
func (ss *SQLiteSession) Del(hash, key []byte) (success bool, err error) {
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := ss.txStmt(ss.backend.existsStmt)
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
success = false
return
}
if err2 != nil {
err = err2
return
}
success = true
deleteStmt := ss.txStmt(ss.backend.deleteStmt)
_, err = deleteStmt.Exec(pos)
return
}
func (ss *SQLiteSession) Fetch(hash, key []byte) (data []byte, err error) { func (ss *SQLiteSession) Fetch(hash, key []byte) (data []byte, err error) {
var pos int64 var pos int64
if pos, err = ss.backend.decoder(key); err != nil { if pos, err = ss.backend.decoder(key); err != nil {