mtsatellite/cmd/mtredisalize/connection.go
2016-07-26 16:32:24 +02:00

242 lines
4.9 KiB
Go

// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package main
import (
"bufio"
"log"
"net"
"strconv"
)
var (
redisOk = []byte("+OK\r\n")
redisPong = []byte("+PONG\r\n")
redisError = []byte("-ERR\r\n")
redisNoSuchBlock = []byte("$-1\r\n")
redisCrnl = []byte("\r\n")
redisEmptyArray = []byte("*0\r\n")
redisQueued = []byte("+QUEUED\r\n")
redisTrue = []byte(":1\r\n")
redisFalse = []byte(":0\r\n")
)
type Connection struct {
conn net.Conn
session Session
maxBulkStringSize int64
boolArray []bool
}
func NewConnection(conn net.Conn, session Session, maxBulkStringSize int64) *Connection {
return &Connection{
conn: conn,
session: session,
maxBulkStringSize: maxBulkStringSize,
boolArray: []bool{}}
}
func (c *Connection) Run() {
defer func() {
c.session.Close()
c.conn.Close()
}()
r := bufio.NewReaderSize(c.conn, 8*1024)
parser := NewRedisParser(r, c, c.maxBulkStringSize)
parser.Parse()
log.Println("client disconnected")
}
func logError(err error) bool {
if err != nil {
log.Printf("ERROR: %s\n", err)
return false
}
return true
}
func (c *Connection) Hdel(hash, key []byte) bool {
success, err := c.session.Del(hash, key)
if err != nil {
return c.writeError(err)
}
return c.writeBool(success)
}
func (c *Connection) Hget(hash, key []byte) bool {
var err error
var data []byte
if data, err = c.session.Fetch(hash, key); err != nil {
return c.writeError(err)
}
return c.writeBlock(data)
}
func (c *Connection) Hset(hash, key, data []byte) bool {
var err error
var exists bool
if exists, err = c.session.Store(hash, key, data); err != nil {
return c.writeError(err)
}
if c.session.InTransaction() {
c.boolArray = append(c.boolArray, exists)
return c.writeQueued()
}
return c.writeBool(exists)
}
func (c *Connection) Multi() bool {
if c.session.InTransaction() {
log.Println("WARN: Already running transaction.")
} else {
if err := c.session.BeginTransaction(); err != nil {
return c.writeError(err)
}
}
return c.writeOk()
}
func (c *Connection) Exec() bool {
if !c.session.InTransaction() {
return c.writeEmptyArray()
}
arr := c.boolArray
c.boolArray = []bool{}
if err := c.session.CommitTransaction(); err != nil {
return c.writeError(err)
}
return c.writeBoolArray(arr)
}
func (c *Connection) Hkeys(hash []byte) bool {
var (
err error
n int
keys <-chan []byte
done = make(chan struct{})
)
defer close(done)
if keys, n, err = c.session.AllKeys(hash, done); err != nil {
return c.writeError(err)
}
if n == 0 {
return c.writeEmptyArray()
}
if _, err := c.conn.Write(redisLength('*', n)); err != nil {
return logError(err)
}
for key := range keys {
if err = c.writeBulkString(key); err != nil {
return logError(err)
}
}
return true
}
func (c *Connection) Ping() bool {
return c.writeMessage(redisPong)
}
func (c *Connection) HSpatial(hash, first, second []byte) bool {
var (
err error
blocks <-chan Block
done = make(chan struct{})
)
defer close(done)
if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil {
return c.writeError(err)
}
for block := range blocks {
if err = c.writeBulkString(block.Key); err != nil {
return logError(err)
}
if err = c.writeBulkString(block.Data); err != nil {
return logError(err)
}
}
return logError(c.writeBulkString(nil))
}
func (c *Connection) writeError(err error) bool {
logError(err)
return c.writeMessage(redisError)
}
func (c *Connection) writeEmptyArray() bool {
return c.writeMessage(redisEmptyArray)
}
func (c *Connection) writeBool(b bool) bool {
if b {
return c.writeMessage(redisTrue)
}
return c.writeMessage(redisFalse)
}
func redisLength(prefix byte, s int) []byte {
buf := append(make([]byte, 0, 16), prefix)
return append(strconv.AppendInt(buf, int64(s), 10), '\r', '\n')
}
func (c *Connection) writeBoolArray(arr []bool) bool {
if _, err := c.conn.Write(redisLength('*', len(arr))); err != nil {
return logError(err)
}
for _, b := range arr {
if !c.writeBool(b) {
return false
}
}
return true
}
func (c *Connection) writeMessage(msg []byte) bool {
_, err := c.conn.Write(msg)
return logError(err)
}
func (c *Connection) writeOk() bool {
return c.writeMessage(redisOk)
}
func (c *Connection) writeQueued() bool {
return c.writeMessage(redisQueued)
}
func (c *Connection) writeBlock(data []byte) bool {
return logError(c.writeBulkString(data))
}
func (c *Connection) writeBulkString(data []byte) (err error) {
con := c.conn
if data == nil {
_, err = con.Write(redisNoSuchBlock)
} else {
if _, err = con.Write(redisLength('$', len(data))); err != nil {
return
}
if _, err = con.Write(data); err != nil {
return
}
_, err = con.Write(redisCrnl)
}
return
}