// Copyright 2014, 2015 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "bufio" "log" "net" "strconv" ) var ( redisOk = []byte("+OK\r\n") redisPong = []byte("+PONG\r\n") redisError = []byte("-ERR\r\n") redisNoSuchBlock = []byte("$-1\r\n") redisCrnl = []byte("\r\n") redisEmptyArray = []byte("*0\r\n") redisQueued = []byte("+QUEUED\r\n") redisTrue = []byte(":1\r\n") redisFalse = []byte(":0\r\n") ) type connection struct { conn net.Conn session session maxBulkStringSize int64 boolArray []bool } func newConnection(conn net.Conn, sess session, maxBulkStringSize int64) *connection { return &connection{ conn: conn, session: sess, maxBulkStringSize: maxBulkStringSize, boolArray: []bool{}} } func (c *connection) run() { defer func() { c.session.close() c.conn.Close() }() r := bufio.NewReaderSize(c.conn, 8*1024) parser := newRedisParser(r, c, c.maxBulkStringSize) parser.parse() log.Println("client disconnected") } func logError(err error) bool { if err != nil { log.Printf("ERROR: %s\n", err) return false } return true } func (c *connection) hdel(hash, key []byte) bool { success, err := c.session.del(hash, key) if err != nil { return c.writeError(err) } return c.writeBool(success) } func (c *connection) hget(hash, key []byte) bool { var err error var data []byte if data, err = c.session.fetch(hash, key); err != nil { return c.writeError(err) } return c.writeBlock(data) } func (c *connection) hset(hash, key, data []byte) bool { var err error var exists bool if exists, err = c.session.store(hash, key, data); err != nil { return c.writeError(err) } if c.session.inTransaction() { c.boolArray = append(c.boolArray, exists) return c.writeQueued() } return c.writeBool(exists) } func (c *connection) multi() bool { if c.session.inTransaction() { log.Println("WARN: Already running transaction.") } else { if err := c.session.beginTransaction(); err != nil { return c.writeError(err) } } return c.writeOk() } func (c *connection) exec() bool { if !c.session.inTransaction() { return c.writeEmptyArray() } arr := c.boolArray c.boolArray = []bool{} if err := c.session.commitTransaction(); err != nil { return c.writeError(err) } return c.writeBoolArray(arr) } func (c *connection) hkeys(hash []byte) bool { var ( err error n int keys <-chan []byte done = make(chan struct{}) ) defer close(done) if keys, n, err = c.session.allKeys(hash, done); err != nil { return c.writeError(err) } if n == 0 { return c.writeEmptyArray() } if _, err := c.conn.Write(redisLength('*', n)); err != nil { return logError(err) } for key := range keys { if err = c.writeBulkString(key); err != nil { return logError(err) } } return true } func (c *connection) ping() bool { return c.writeMessage(redisPong) } func (c *connection) hSpatial(hash, first, second []byte) bool { var ( err error blocks <-chan block done = make(chan struct{}) ) defer close(done) if blocks, err = c.session.spatialQuery(hash, first, second, done); err != nil { return c.writeError(err) } for block := range blocks { if err = c.writeBulkString(block.Key); err != nil { return logError(err) } if err = c.writeBulkString(block.Data); err != nil { return logError(err) } } return logError(c.writeBulkString(nil)) } func (c *connection) writeError(err error) bool { logError(err) return c.writeMessage(redisError) } func (c *connection) writeEmptyArray() bool { return c.writeMessage(redisEmptyArray) } func (c *connection) writeBool(b bool) bool { if b { return c.writeMessage(redisTrue) } return c.writeMessage(redisFalse) } func redisLength(prefix byte, s int) []byte { buf := append(make([]byte, 0, 16), prefix) return append(strconv.AppendInt(buf, int64(s), 10), '\r', '\n') } func (c *connection) writeBoolArray(arr []bool) bool { if _, err := c.conn.Write(redisLength('*', len(arr))); err != nil { return logError(err) } for _, b := range arr { if !c.writeBool(b) { return false } } return true } func (c *connection) writeMessage(msg []byte) bool { _, err := c.conn.Write(msg) return logError(err) } func (c *connection) writeOk() bool { return c.writeMessage(redisOk) } func (c *connection) writeQueued() bool { return c.writeMessage(redisQueued) } func (c *connection) writeBlock(data []byte) bool { return logError(c.writeBulkString(data)) } func (c *connection) writeBulkString(data []byte) (err error) { con := c.conn if data == nil { _, err = con.Write(redisNoSuchBlock) } else { if _, err = con.Write(redisLength('$', len(data))); err != nil { return } if _, err = con.Write(data); err != nil { return } _, err = con.Write(redisCrnl) } return }