mtsatellite/parser.go

254 lines
5.3 KiB
Go

// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"strconv"
)
type RedisConsumer interface {
ConsumeInteger(int64) bool
ConsumeArray(int64) bool
ConsumeSimpleString(string) bool
ConsumeBulkString([]byte) bool
ConsumeError(error) bool
}
type RedisParser struct {
reader *bufio.Reader
consumer RedisConsumer
}
func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer) *RedisParser {
return &RedisParser{
reader: reader,
consumer: consumer}
}
func (rp *RedisParser) Parse() {
for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() {
}
}
func (rp *RedisParser) nextLine() []byte {
line, err := rp.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
rp.consumer.ConsumeError(err)
}
return nil
}
return bytes.TrimRight(line, "\r\n")
}
func (rp *RedisParser) dispatch(line []byte) bool {
if len(line) < 1 {
return false
}
switch line[0] {
case '-':
return true // ignore errors
case ':':
return rp.integer(line)
case '+':
return rp.simpleString(line)
case '$':
return rp.bulkString(line)
case '*':
return rp.array(line)
}
return true
}
func (rp *RedisParser) simpleString(line []byte) bool {
return rp.consumer.ConsumeSimpleString(string(line[1:]))
}
func (rp *RedisParser) integer(line []byte) bool {
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeInteger(i)
}
func (rp *RedisParser) bulkString(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
switch {
case i < 0:
return rp.consumer.ConsumeBulkString(nil)
case i == 0:
return rp.consumer.ConsumeBulkString([]byte{})
default:
if i >= 4*1024*1024 { // 4MB should be sufficient
return rp.consumer.ConsumeError(
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
}
data := make([]byte, i, i)
for rest := i; rest > 0; {
var n int
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
return rp.consumer.ConsumeError(err)
}
rest -= int64(n)
}
if _, err = rp.reader.ReadBytes('\n'); err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeBulkString(data)
}
}
func (rp *RedisParser) array(line []byte) bool {
var i int64
var err error
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return rp.consumer.ConsumeError(err)
}
return rp.consumer.ConsumeArray(i)
}
type RedisCommands interface {
Hget(hash, key []byte) bool
Hset(hash, key, block []byte) bool
Multi() bool
Exec() bool
Hkeys(hash []byte) bool
}
type RedisCommandExecutor struct {
commands RedisCommands
missing int64
args []interface{}
}
func NewRedisCommandExecutor(commands RedisCommands) *RedisCommandExecutor {
return &RedisCommandExecutor{
commands: commands,
missing: 0,
args: []interface{}{}}
}
func (rce *RedisCommandExecutor) push(i interface{}) bool {
rce.args = append(rce.args, i)
rce.missing--
if rce.missing <= 0 {
rce.missing = 0
res := rce.execute()
rce.args = []interface{}{}
return res
}
return true
}
func asString(i interface{}) string {
switch i.(type) {
case string:
return i.(string)
case []byte:
return string(i.([]byte))
}
return fmt.Sprintf("%s", i)
}
func (rce *RedisCommandExecutor) execute() bool {
l := len(rce.args)
if l < 1 {
log.Printf("WARN: Too less argument for command.")
return false
}
cmd := asString(rce.args[0])
switch cmd {
case "HGET":
if l < 3 {
log.Println("WARN: Missing argments for HGET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
if !ok1 || !ok2 {
log.Println("WARN: HGET data are not byte slices.")
return false
}
return rce.commands.Hget(hash, key)
case "HSET":
if l < 4 {
log.Println("WARN: Missing argments for HSET.")
return false
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
value, ok3 := rce.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Println("WARN: HSET data are not byte slices.")
return false
}
return rce.commands.Hset(hash, key, value)
case "MULTI":
return rce.commands.Multi()
case "EXEC":
return rce.commands.Exec()
case "HKEYS":
if l < 2 {
log.Println("WARN: Missing argments for HKEYS.")
return false
}
hash, ok := rce.args[1].([]byte)
if !ok {
log.Println("WARN: HKEYS data are not byte slices.")
return false
}
return rce.commands.Hkeys(hash)
}
log.Printf("WARN: unknown command: '%s'\n", cmd)
return false
}
func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool {
return rce.push(s)
}
func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool {
return rce.push(data)
}
func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool {
return rce.push(i)
}
func (rce *RedisCommandExecutor) ConsumeError(err error) bool {
log.Printf("error: %s", err)
return true
}
func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
if rce.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false
}
if i < 0 {
log.Println("Null arrays are not supported")
return false
}
rce.missing = i
return true
}