// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "bufio" "log" "net" "strconv" ) var ( redisOk = []byte("+OK\r\n") redisPong = []byte("+PONG\r\n") redisError = []byte("-ERR\r\n") redisNoSuchBlock = []byte("$-1\r\n") redisCrnl = []byte("\r\n") redisEmptyArray = []byte("*0\r\n") redisQueued = []byte("+QUEUED\r\n") redisTrue = []byte(":1\r\n") redisFalse = []byte(":0\r\n") ) type Connection struct { conn net.Conn session Session maxBulkStringSize int64 boolArray []bool } func NewConnection(conn net.Conn, session Session, maxBulkStringSize int64) *Connection { return &Connection{ conn: conn, session: session, maxBulkStringSize: maxBulkStringSize, boolArray: []bool{}} } func (c *Connection) Run() { defer func() { c.session.Close() c.conn.Close() }() rce := NewRedisCommandExecutor(c) r := bufio.NewReaderSize(c.conn, 8*1024) parser := NewRedisParser(r, rce, c.maxBulkStringSize) parser.Parse() log.Println("client disconnected") } func logError(err error) bool { log.Printf("ERROR: %s\n", err) return false } func (c *Connection) Hget(hash, key []byte) bool { var err error var data []byte if data, err = c.session.Fetch(hash, key); err != nil { return c.writeError(err) } return c.writeBlock(data) } func (c *Connection) Hset(hash, key, data []byte) bool { var err error var exists bool if exists, err = c.session.Store(hash, key, data); err != nil { return c.writeError(err) } if c.session.InTransaction() { c.boolArray = append(c.boolArray, exists) return c.writeQueued() } return c.writeBool(exists) } func (c *Connection) Multi() bool { if c.session.InTransaction() { log.Println("WARN: Already running transaction.") } else { if err := c.session.BeginTransaction(); err != nil { return c.writeError(err) } } return c.writeOk() } func (c *Connection) Exec() bool { if !c.session.InTransaction() { return c.writeEmptyArray() } arr := c.boolArray c.boolArray = []bool{} if err := c.session.CommitTransaction(); err != nil { return c.writeError(err) } return c.writeBoolArray(arr) } func (c *Connection) Hkeys(hash []byte) bool { var ( err error n int keys chan []byte done = make(chan struct{}) ) defer close(done) if keys, n, err = c.session.AllKeys(hash, done); err != nil { return c.writeError(err) } if n == 0 { return c.writeEmptyArray() } if _, err := c.conn.Write(redisLength('*', n)); err != nil { return logError(err) } for key := range keys { if err = c.writeBulkString(key); err != nil { return logError(err) } } return true } func (c *Connection) Ping() bool { if _, err := c.conn.Write(redisPong); err != nil { return logError(err) } return true } func (c *Connection) HSpatial(hash, first, second []byte) bool { var ( err error blocks chan Block done = make(chan struct{}) ) defer close(done) if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil { return c.writeError(err) } for block := range blocks { if err = c.writeBulkString(block.Key); err != nil { return logError(err) } if err = c.writeBulkString(block.Data); err != nil { return logError(err) } } if err = c.writeBulkString(nil); err != nil { return logError(err) } return true } func (c *Connection) writeError(err error) bool { logError(err) if _, err = c.conn.Write(redisError); err != nil { return logError(err) } return true } func (c *Connection) writeEmptyArray() bool { if _, err := c.conn.Write(redisEmptyArray); err != nil { return logError(err) } return true } func (c *Connection) writeBool(b bool) bool { var err error if b { _, err = c.conn.Write(redisTrue) } else { _, err = c.conn.Write(redisFalse) } if err != nil { return logError(err) } return true } func redisLength(prefix byte, s int) []byte { buf := append(make([]byte, 0, 16), prefix) return append(strconv.AppendInt(buf, int64(s), 10), '\r', '\n') } func (c *Connection) writeBoolArray(arr []bool) bool { if _, err := c.conn.Write(redisLength('*', len(arr))); err != nil { return logError(err) } for _, b := range arr { if !c.writeBool(b) { return false } } return true } func (c *Connection) writeOk() bool { if _, err := c.conn.Write(redisOk); err != nil { return logError(err) } return true } func (c *Connection) writeQueued() bool { if _, err := c.conn.Write(redisQueued); err != nil { return logError(err) } return true } func (c *Connection) writeBlock(data []byte) bool { if err := c.writeBulkString(data); err != nil { return logError(err) } return true } func (c *Connection) writeBulkString(data []byte) (err error) { con := c.conn if data == nil { _, err = con.Write(redisNoSuchBlock) } else { if _, err = con.Write(redisLength('$', len(data))); err != nil { return } if _, err = con.Write(data); err != nil { return } _, err = con.Write(redisCrnl) } return }