mtredisalize: Removed a useless level of abstraction in redis parser.

This commit is contained in:
Sascha L. Teichmann 2016-04-25 11:43:50 +02:00
parent 14ae6d3873
commit 72bd5ec77a
2 changed files with 65 additions and 83 deletions

View File

@ -43,9 +43,8 @@ func (c *Connection) Run() {
c.session.Close()
c.conn.Close()
}()
rce := NewRedisCommandExecutor(c)
r := bufio.NewReaderSize(c.conn, 8*1024)
parser := NewRedisParser(r, rce, c.maxBulkStringSize)
parser := NewRedisParser(r, c, c.maxBulkStringSize)
parser.Parse()
log.Println("client disconnected")
}

View File

@ -14,25 +14,31 @@ import (
"strings"
)
type RedisConsumer interface {
ConsumeInteger(int64) bool
ConsumeArray(int64) bool
ConsumeSimpleString(string) bool
ConsumeBulkString([]byte) bool
ConsumeError(error) bool
type RedisCommands interface {
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
HSpatial(hash, first, second []byte) bool
Ping() bool
}
type RedisParser struct {
reader *bufio.Reader
consumer RedisConsumer
commands RedisCommands
missing int64
args []interface{}
maxBulkStringSize int64
}
func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer,
func NewRedisParser(reader *bufio.Reader,
commands RedisCommands,
maxBulkStringSize int64) *RedisParser {
return &RedisParser{
reader: reader,
consumer: consumer,
commands: commands,
maxBulkStringSize: maxBulkStringSize}
}
@ -45,7 +51,7 @@ func (rp *RedisParser) nextLine() []byte {
line, err := rp.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
rp.consumer.ConsumeError(err)
rp.consumeError(err)
}
return nil
}
@ -72,15 +78,15 @@ func (rp *RedisParser) dispatch(line []byte) bool {
}
func (rp *RedisParser) simpleString(line []byte) bool {
return rp.consumer.ConsumeSimpleString(string(line[1:]))
return rp.consumeSimpleString(string(line[1:]))
}
func (rp *RedisParser) integer(line []byte) bool {
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
return rp.consumeError(err)
}
return rp.consumer.ConsumeInteger(i)
return rp.consumeInteger(i)
}
func (rp *RedisParser) bulkString(line []byte) bool {
@ -88,30 +94,30 @@ func (rp *RedisParser) bulkString(line []byte) bool {
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
return rp.consumeError(err)
}
switch {
case i < 0:
return rp.consumer.ConsumeBulkString(nil)
return rp.consumeBulkString(nil)
case i == 0:
return rp.consumer.ConsumeBulkString([]byte{})
return rp.consumeBulkString([]byte{})
default:
if i > rp.maxBulkStringSize { // prevent denial of service.
return rp.consumer.ConsumeError(
return rp.consumeError(
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
}
data := make([]byte, i)
for rest := i; rest > 0; {
var n int
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
return rp.consumer.ConsumeError(err)
return rp.consumeError(err)
}
rest -= int64(n)
}
if _, err = rp.reader.ReadBytes('\n'); err != nil {
return rp.consumer.ConsumeError(err)
return rp.consumeError(err)
}
return rp.consumer.ConsumeBulkString(data)
return rp.consumeBulkString(data)
}
}
@ -120,41 +126,18 @@ func (rp *RedisParser) array(line []byte) bool {
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
return rp.consumeError(err)
}
return rp.consumer.ConsumeArray(i)
return rp.consumeArray(i)
}
type RedisCommands interface {
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
HSpatial(hash, first, second []byte) bool
Ping() bool
}
type RedisCommandExecutor struct {
commands RedisCommands
missing int64
args []interface{}
}
func NewRedisCommandExecutor(commands RedisCommands) *RedisCommandExecutor {
return &RedisCommandExecutor{
commands: commands,
missing: 0,
args: []interface{}{}}
}
func (rce *RedisCommandExecutor) push(i interface{}) bool {
rce.args = append(rce.args, i)
rce.missing--
if rce.missing <= 0 {
rce.missing = 0
res := rce.execute()
rce.args = []interface{}{}
func (rp *RedisParser) push(i interface{}) bool {
rp.args = append(rp.args, i)
rp.missing--
if rp.missing <= 0 {
rp.missing = 0
res := rp.execute()
rp.args = []interface{}{}
return res
}
return true
@ -170,101 +153,101 @@ func asString(i interface{}) string {
return fmt.Sprintf("%s", i)
}
func (rce *RedisCommandExecutor) execute() bool {
l := len(rce.args)
func (rp *RedisParser) execute() bool {
l := len(rp.args)
if l < 1 {
log.Println("WARN: Too less argument for command.")
return false
}
cmd := strings.ToUpper(asString(rce.args[0]))
cmd := strings.ToUpper(asString(rp.args[0]))
switch cmd {
case "HGET":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HGET data are not byte slices.")
return false
}
return rce.commands.Hget(hash, key)
return rp.commands.Hget(hash, key)
case "HSET":
if l < 4 {
log.Println("WARN: Missing argments for HSET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
value, ok3 := rce.args[3].([]byte)
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
value, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSET data are not byte slices.")
return false
}
return rce.commands.Hset(hash, key, value)
return rp.commands.Hset(hash, key, value)
case "MULTI":
return rce.commands.Multi()
return rp.commands.Multi()
case "EXEC":
return rce.commands.Exec()
return rp.commands.Exec()
case "HKEYS":
if l < 2 {
log.Println("WARN: Missing argments for HKEYS.")
return false
}
hash, ok := rce.args[1].([]byte)
hash, ok := rp.args[1].([]byte)
if !ok {
log.Println("WARN: HKEYS data are not byte slices.")
return false
}
return rce.commands.Hkeys(hash)
return rp.commands.Hkeys(hash)
case "HSPATIAL":
if l < 4 {
log.Println("WARN: Missing argments for HSPATIAL.")
return false
}
hash, ok1 := rce.args[1].([]byte)
first, ok2 := rce.args[2].([]byte)
second, ok3 := rce.args[3].([]byte)
hash, ok1 := rp.args[1].([]byte)
first, ok2 := rp.args[2].([]byte)
second, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSPATIAL data are not byte slices.")
return false
}
return rce.commands.HSpatial(hash, first, second)
return rp.commands.HSpatial(hash, first, second)
case "PING":
return rce.commands.Ping()
return rp.commands.Ping()
}
log.Printf("WARN: unknown command: '%s'\n", cmd)
return false
}
func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool {
return rce.push(s)
func (rp *RedisParser) consumeSimpleString(s string) bool {
return rp.push(s)
}
func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool {
return rce.push(data)
func (rp *RedisParser) consumeBulkString(data []byte) bool {
return rp.push(data)
}
func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool {
return rce.push(i)
func (rp *RedisParser) consumeInteger(i int64) bool {
return rp.push(i)
}
func (rce *RedisCommandExecutor) ConsumeError(err error) bool {
func (rp *RedisParser) consumeError(err error) bool {
log.Printf("error: %s\n", err)
return true
}
func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
if rce.missing > 0 {
func (rp *RedisParser) consumeArray(i int64) bool {
if rp.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false
}
@ -272,6 +255,6 @@ func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
log.Println("Null arrays are not supported")
return false
}
rce.missing = i
rp.missing = i
return true
}