mirror of
https://bitbucket.org/s_l_teichmann/mtsatellite
synced 2025-01-11 17:30:18 +01:00
278 lines
5.9 KiB
Go
278 lines
5.9 KiB
Go
// Copyright 2014 by Sascha L. Teichmann
|
|
// Use of this source code is governed by the MIT license
|
|
// that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
type RedisConsumer interface {
|
|
ConsumeInteger(int64) bool
|
|
ConsumeArray(int64) bool
|
|
ConsumeSimpleString(string) bool
|
|
ConsumeBulkString([]byte) bool
|
|
ConsumeError(error) bool
|
|
}
|
|
|
|
type RedisParser struct {
|
|
reader *bufio.Reader
|
|
consumer RedisConsumer
|
|
maxBulkStringSize int64
|
|
}
|
|
|
|
func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer,
|
|
maxBulkStringSize int64) *RedisParser {
|
|
return &RedisParser{
|
|
reader: reader,
|
|
consumer: consumer,
|
|
maxBulkStringSize: maxBulkStringSize}
|
|
}
|
|
|
|
func (rp *RedisParser) Parse() {
|
|
for line := rp.nextLine(); line != nil && rp.dispatch(line); line = rp.nextLine() {
|
|
}
|
|
}
|
|
|
|
func (rp *RedisParser) nextLine() []byte {
|
|
line, err := rp.reader.ReadBytes('\n')
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
rp.consumer.ConsumeError(err)
|
|
}
|
|
return nil
|
|
}
|
|
return bytes.TrimRight(line, "\r\n")
|
|
}
|
|
|
|
func (rp *RedisParser) dispatch(line []byte) bool {
|
|
if len(line) < 1 {
|
|
return false
|
|
}
|
|
switch line[0] {
|
|
case '-':
|
|
return true // ignore errors
|
|
case ':':
|
|
return rp.integer(line)
|
|
case '+':
|
|
return rp.simpleString(line)
|
|
case '$':
|
|
return rp.bulkString(line)
|
|
case '*':
|
|
return rp.array(line)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (rp *RedisParser) simpleString(line []byte) bool {
|
|
return rp.consumer.ConsumeSimpleString(string(line[1:]))
|
|
}
|
|
|
|
func (rp *RedisParser) integer(line []byte) bool {
|
|
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
if err != nil {
|
|
return rp.consumer.ConsumeError(err)
|
|
}
|
|
return rp.consumer.ConsumeInteger(i)
|
|
}
|
|
|
|
func (rp *RedisParser) bulkString(line []byte) bool {
|
|
var i int64
|
|
var err error
|
|
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
if err != nil {
|
|
return rp.consumer.ConsumeError(err)
|
|
}
|
|
switch {
|
|
case i < 0:
|
|
return rp.consumer.ConsumeBulkString(nil)
|
|
case i == 0:
|
|
return rp.consumer.ConsumeBulkString([]byte{})
|
|
default:
|
|
if i > rp.maxBulkStringSize { // prevent denial of service.
|
|
return rp.consumer.ConsumeError(
|
|
fmt.Errorf("Bulk string too large (%d bytes).\n", i))
|
|
}
|
|
data := make([]byte, i)
|
|
for rest := i; rest > 0; {
|
|
var n int
|
|
if n, err = rp.reader.Read(data[i-rest : i]); err != nil {
|
|
return rp.consumer.ConsumeError(err)
|
|
}
|
|
rest -= int64(n)
|
|
}
|
|
if _, err = rp.reader.ReadBytes('\n'); err != nil {
|
|
return rp.consumer.ConsumeError(err)
|
|
}
|
|
return rp.consumer.ConsumeBulkString(data)
|
|
}
|
|
}
|
|
|
|
func (rp *RedisParser) array(line []byte) bool {
|
|
var i int64
|
|
var err error
|
|
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
if err != nil {
|
|
return rp.consumer.ConsumeError(err)
|
|
}
|
|
return rp.consumer.ConsumeArray(i)
|
|
}
|
|
|
|
type RedisCommands interface {
|
|
Hget(hash, key []byte) bool
|
|
Hset(hash, key, block []byte) bool
|
|
Multi() bool
|
|
Exec() bool
|
|
Hkeys(hash []byte) bool
|
|
HSpatial(hash, first, second []byte) bool
|
|
Ping() bool
|
|
}
|
|
|
|
type RedisCommandExecutor struct {
|
|
commands RedisCommands
|
|
missing int64
|
|
args []interface{}
|
|
}
|
|
|
|
func NewRedisCommandExecutor(commands RedisCommands) *RedisCommandExecutor {
|
|
return &RedisCommandExecutor{
|
|
commands: commands,
|
|
missing: 0,
|
|
args: []interface{}{}}
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) push(i interface{}) bool {
|
|
rce.args = append(rce.args, i)
|
|
rce.missing--
|
|
if rce.missing <= 0 {
|
|
rce.missing = 0
|
|
res := rce.execute()
|
|
rce.args = []interface{}{}
|
|
return res
|
|
}
|
|
return true
|
|
}
|
|
|
|
func asString(i interface{}) string {
|
|
switch v := i.(type) {
|
|
case string:
|
|
return v
|
|
case []byte:
|
|
return string(v)
|
|
}
|
|
return fmt.Sprintf("%s", i)
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) execute() bool {
|
|
l := len(rce.args)
|
|
if l < 1 {
|
|
log.Println("WARN: Too less argument for command.")
|
|
return false
|
|
}
|
|
cmd := strings.ToUpper(asString(rce.args[0]))
|
|
switch cmd {
|
|
case "HGET":
|
|
if l < 3 {
|
|
log.Println("WARN: Missing argments for HGET.")
|
|
return false
|
|
}
|
|
hash, ok1 := rce.args[1].([]byte)
|
|
key, ok2 := rce.args[2].([]byte)
|
|
if !ok1 || !ok2 {
|
|
log.Println("WARN: HGET data are not byte slices.")
|
|
return false
|
|
}
|
|
return rce.commands.Hget(hash, key)
|
|
|
|
case "HSET":
|
|
if l < 4 {
|
|
log.Println("WARN: Missing argments for HSET.")
|
|
return false
|
|
}
|
|
hash, ok1 := rce.args[1].([]byte)
|
|
key, ok2 := rce.args[2].([]byte)
|
|
value, ok3 := rce.args[3].([]byte)
|
|
|
|
if !ok1 || !ok2 || !ok3 {
|
|
log.Println("WARN: HSET data are not byte slices.")
|
|
return false
|
|
}
|
|
return rce.commands.Hset(hash, key, value)
|
|
|
|
case "MULTI":
|
|
return rce.commands.Multi()
|
|
|
|
case "EXEC":
|
|
return rce.commands.Exec()
|
|
|
|
case "HKEYS":
|
|
if l < 2 {
|
|
log.Println("WARN: Missing argments for HKEYS.")
|
|
return false
|
|
}
|
|
hash, ok := rce.args[1].([]byte)
|
|
if !ok {
|
|
log.Println("WARN: HKEYS data are not byte slices.")
|
|
return false
|
|
}
|
|
return rce.commands.Hkeys(hash)
|
|
|
|
case "HSPATIAL":
|
|
if l < 4 {
|
|
log.Println("WARN: Missing argments for HSPATIAL.")
|
|
return false
|
|
}
|
|
hash, ok1 := rce.args[1].([]byte)
|
|
first, ok2 := rce.args[2].([]byte)
|
|
second, ok3 := rce.args[3].([]byte)
|
|
|
|
if !ok1 || !ok2 || !ok3 {
|
|
log.Println("WARN: HSPATIAL data are not byte slices.")
|
|
return false
|
|
}
|
|
return rce.commands.HSpatial(hash, first, second)
|
|
|
|
case "PING":
|
|
return rce.commands.Ping()
|
|
}
|
|
log.Printf("WARN: unknown command: '%s'\n", cmd)
|
|
return false
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool {
|
|
return rce.push(s)
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool {
|
|
return rce.push(data)
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool {
|
|
return rce.push(i)
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) ConsumeError(err error) bool {
|
|
log.Printf("error: %s\n", err)
|
|
return true
|
|
}
|
|
|
|
func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
|
|
if rce.missing > 0 {
|
|
log.Println("WARN: Nested arrays are not supported!")
|
|
return false
|
|
}
|
|
if i < 0 {
|
|
log.Println("Null arrays are not supported")
|
|
return false
|
|
}
|
|
rce.missing = i
|
|
return true
|
|
}
|