// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "bufio" "bytes" "fmt" "io" "log" "strconv" ) type RedisConsumer interface { ConsumeInteger(int64) bool ConsumeArray(int64) bool ConsumeSimpleString(string) bool ConsumeBulkString([]byte) bool ConsumeError(error) bool } type RedisParser struct { reader *bufio.Reader consumer RedisConsumer } func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer) *RedisParser { return &RedisParser{ reader: reader, consumer: consumer} } func (rp *RedisParser) Parse() { for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() { } } func (rp *RedisParser) nextLine() []byte { line, err := rp.reader.ReadBytes('\n') if err != nil { if err != io.EOF { rp.consumer.ConsumeError(err) } return nil } return bytes.TrimRight(line, "\r\n") } func (rp *RedisParser) dispatch(line []byte) bool { if len(line) < 1 { return false } switch line[0] { case '-': return true // ignore errors case ':': return rp.integer(line) case '+': return rp.simpleString(line) case '$': return rp.bulkString(line) case '*': return rp.array(line) } return true } func (rp *RedisParser) simpleString(line []byte) bool { return rp.consumer.ConsumeSimpleString(string(line[1:])) } func (rp *RedisParser) integer(line []byte) bool { i, err := strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return rp.consumer.ConsumeError(err) } return rp.consumer.ConsumeInteger(i) } func (rp *RedisParser) bulkString(line []byte) bool { var i int64 var err error i, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return rp.consumer.ConsumeError(err) } switch { case i < 0: return rp.consumer.ConsumeBulkString(nil) case i == 0: return rp.consumer.ConsumeBulkString([]byte{}) default: data := make([]byte, i, i) var n int if n, err = rp.reader.Read(data); err != nil { return rp.consumer.ConsumeError(err) } if _, err = rp.reader.ReadBytes('\n'); err != nil { return rp.consumer.ConsumeError(err) } return rp.consumer.ConsumeBulkString(data[0:n]) } } func (rp *RedisParser) array(line []byte) bool { var i int64 var err error i, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return rp.consumer.ConsumeError(err) } return rp.consumer.ConsumeArray(i) } type RedisCommands interface { Hget(hash, key []byte) bool Hset(hash, key, block []byte) bool Multi() bool Exec() bool } type RedisCommandExecutor struct { commands RedisCommands missing int64 args []interface{} } func NewRedisCommandExecutor(commands RedisCommands) *RedisCommandExecutor { return &RedisCommandExecutor{ commands: commands, missing: 0, args: []interface{}{}} } func (rce *RedisCommandExecutor) push(i interface{}) bool { rce.args = append(rce.args, i) rce.missing-- if rce.missing <= 0 { rce.missing = 0 res := rce.execute() rce.args = []interface{}{} return res } return true } func asString(i interface{}) string { switch i.(type) { case string: return i.(string) case []byte: return string(i.([]byte)) } return fmt.Sprintf("%s", i) } func (rce *RedisCommandExecutor) execute() bool { l := len(rce.args) if l < 1 { log.Printf("WARN: Too less argument for command.") return false } cmd := asString(rce.args[0]) switch cmd { case "HGET": if l < 3 { log.Println("WARN: Missing argments for HGET.") return false } hash, ok1 := rce.args[1].([]byte) key, ok2 := rce.args[2].([]byte) if !ok1 || !ok2 { log.Println("WARN: HGET data are not byte slices.") return false } return rce.commands.Hget(hash, key) case "HSET": if l < 4 { log.Println("WARN: Missing argments for HSET.") return false } hash, ok1 := rce.args[1].([]byte) key, ok2 := rce.args[2].([]byte) value, ok3 := rce.args[3].([]byte) if !ok1 || !ok2 || !ok3 { log.Println("WARN: HSET data are not byte slices.") return false } return rce.commands.Hset(hash, key, value) case "MULTI": return rce.commands.Multi() case "EXEC": return rce.commands.Exec() } log.Printf("WARN: unknown command: '%s'\n", cmd) return false } func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool { return rce.push(s) } func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool { return rce.push(data) } func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool { return rce.push(i) } func (rce *RedisCommandExecutor) ConsumeError(err error) bool { log.Printf("error: %s", err) return true } func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool { if rce.missing > 0 { log.Println("WARN: Nested arrays are not supported!") return false } if i < 0 { log.Println("Null arrays are not supported") return false } rce.missing = i return true }