mtsatellite/cmd/mtredisalize/redisparser.go

261 lines
5.3 KiB
Go
Raw Normal View History

// Copyright 2014, 2015 by Sascha L. Teichmann
2014-08-03 15:59:56 +02:00
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"strconv"
"strings"
)
type RedisCommands interface {
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
HSpatial(hash, first, second []byte) bool
Ping() bool
}
type RedisParser struct {
reader *bufio.Reader
commands RedisCommands
missing int64
args []interface{}
maxBulkStringSize int64
}
func NewRedisParser(reader *bufio.Reader,
commands RedisCommands,
maxBulkStringSize int64) *RedisParser {
return &RedisParser{
reader: reader,
commands: commands,
maxBulkStringSize: maxBulkStringSize}
}
func (rp *RedisParser) Parse() {
for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() {
}
}
func (rp *RedisParser) nextLine() []byte {
line, err := rp.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
rp.consumeError(err)
}
return nil
}
2014-08-03 14:52:24 +02:00
return bytes.TrimRight(line, "\r\n")
}
func (rp *RedisParser) dispatch(line []byte) bool {
if len(line) < 1 {
return false
}
switch line[0] {
case '-':
return true // ignore errors
case ':':
return rp.integer(line)
case '+':
return rp.simpleString(line)
case '$':
return rp.bulkString(line)
case '*':
return rp.array(line)
}
return true
}
func (rp *RedisParser) simpleString(line []byte) bool {
return rp.consumeSimpleString(string(line[1:]))
}
func (rp *RedisParser) integer(line []byte) bool {
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
return rp.consumeInteger(i)
}
func (rp *RedisParser) bulkString(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
switch {
case i < 0:
return rp.consumeBulkString(nil)
case i == 0:
return rp.consumeBulkString([]byte{})
default:
if i > rp.maxBulkStringSize { // prevent denial of service.
return rp.consumeError(
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
}
data := make([]byte, i)
for rest := i; rest > 0; {
var n int
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
return rp.consumeError(err)
}
rest -= int64(n)
}
if _, err = rp.reader.ReadBytes('\n'); err != nil {
return rp.consumeError(err)
}
return rp.consumeBulkString(data)
}
}
func (rp *RedisParser) array(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumeError(err)
}
return rp.consumeArray(i)
}
func (rp *RedisParser) push(i interface{}) bool {
rp.args = append(rp.args, i)
rp.missing--
if rp.missing <= 0 {
rp.missing = 0
res := rp.execute()
rp.args = []interface{}{}
return res
}
return true
}
func asString(i interface{}) string {
2014-08-31 19:05:43 +02:00
switch v := i.(type) {
case string:
2014-08-31 19:05:43 +02:00
return v
case []byte:
2014-08-31 19:05:43 +02:00
return string(v)
}
return fmt.Sprintf("%s", i)
}
func (rp *RedisParser) execute() bool {
l := len(rp.args)
if l < 1 {
log.Println("WARN: Too less argument for command.")
return false
}
cmd := strings.ToUpper(asString(rp.args[0]))
switch cmd {
case "HGET":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HGET data are not byte slices.")
return false
}
return rp.commands.Hget(hash, key)
case "HSET":
if l < 4 {
log.Println("WARN: Missing argments for HSET.")
return false
}
hash, ok1 := rp.args[1].([]byte)
key, ok2 := rp.args[2].([]byte)
value, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSET data are not byte slices.")
return false
}
return rp.commands.Hset(hash, key, value)
case "MULTI":
return rp.commands.Multi()
case "EXEC":
return rp.commands.Exec()
case "HKEYS":
if l < 2 {
log.Println("WARN: Missing argments for HKEYS.")
return false
}
hash, ok := rp.args[1].([]byte)
if !ok {
log.Println("WARN: HKEYS data are not byte slices.")
return false
}
return rp.commands.Hkeys(hash)
case "HSPATIAL":
if l < 4 {
log.Println("WARN: Missing argments for HSPATIAL.")
return false
}
hash, ok1 := rp.args[1].([]byte)
first, ok2 := rp.args[2].([]byte)
second, ok3 := rp.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
2014-09-07 17:29:42 +02:00
log.Println("WARN: HSPATIAL data are not byte slices.")
return false
}
return rp.commands.HSpatial(hash, first, second)
case "PING":
return rp.commands.Ping()
}
log.Printf("WARN: unknown command: '%s'\n", cmd)
return false
}
func (rp *RedisParser) consumeSimpleString(s string) bool {
return rp.push(s)
}
func (rp *RedisParser) consumeBulkString(data []byte) bool {
return rp.push(data)
}
func (rp *RedisParser) consumeInteger(i int64) bool {
return rp.push(i)
}
func (rp *RedisParser) consumeError(err error) bool {
log.Printf("error: %s\n", err)
return true
}
func (rp *RedisParser) consumeArray(i int64) bool {
if rp.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false
}
if i < 0 {
log.Println("Null arrays are not supported")
return false
}
rp.missing = i
return true
}