mtsatellite/connection.go

207 lines
3.8 KiB
Go
Raw Normal View History

package main
import (
"bufio"
"fmt"
"log"
"net"
leveldb "github.com/jmhodges/levigo"
)
var (
redisOk = []byte("+OK\r\n")
redisDbError = []byte("-FAIL\r\n")
redisNoSuchBlock = []byte("$-1\r\n")
redisCrnl = []byte("\r\n")
redisEmptyArray = []byte("*0\r\n")
redisQueued = []byte("+QUEUED\r\n")
)
type Connection struct {
conn net.Conn
db *leveldb.DB
tx *leveldb.WriteBatch
intArray []int
}
func NewConnection(conn net.Conn, db *leveldb.DB) *Connection {
return &Connection{
conn,
db,
nil,
[]int{}}
}
func (c *Connection) Run() {
defer c.conn.Close()
rce := NewRedisCommandExecutor(c)
r := bufio.NewReader(c.conn)
parser := NewRedisParser(r, rce)
parser.Parse()
}
func logError(err error) {
log.Printf("ERROR: %s", err)
}
func (c *Connection) Hget(hash, key []byte) bool {
log.Printf("client requested block: %q", key)
var err error
var data []byte
ro := leveldb.NewReadOptions()
defer ro.Close()
if data, err = c.db.Get(ro, key); err != nil {
logError(err)
return c.writeError()
}
return c.writeBlock(data)
}
func (c *Connection) Hset(hash, key, block []byte) bool {
log.Printf("client wants to store block: %q", key)
var err error
var exists int
if exists, err = c.keyExists(key); err != nil {
logError(err)
return c.writeError()
}
if c.tx != nil {
c.tx.Put(key, block)
c.intArray = append(c.intArray, exists)
return c.writeQueued()
}
wo := leveldb.NewWriteOptions()
defer wo.Close()
if err = c.db.Put(wo, key, block); err != nil {
logError(err)
return c.writeError()
}
return c.writeInteger(exists)
}
func (c *Connection) keyExists(key []byte) (exists int, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = c.db.Get(ro, key); err != nil {
return
}
if data != nil {
exists = 1
} else {
exists = 0
}
return
}
func (c *Connection) Multi() bool {
if c.tx != nil {
log.Println("WARN: Already running transaction.")
} else {
c.tx = leveldb.NewWriteBatch()
}
return c.writeOk()
}
func (c *Connection) Exec() bool {
var err error
if c.tx == nil {
return c.writeEmptyArray()
}
tx := c.tx
c.tx = nil
defer tx.Close()
arr := c.intArray
c.intArray = []int{}
wo := leveldb.NewWriteOptions()
defer wo.Close()
if err = c.db.Write(wo, tx); err != nil {
logError(err)
return c.writeError()
}
return c.writeIntegerArray(arr)
}
func (c *Connection) writeError() bool {
if _, err := c.conn.Write(redisDbError); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeEmptyArray() bool {
if _, err := c.conn.Write(redisDbError); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeInteger(v int) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeIntegerArray(arr []int) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil {
logError(err)
return false
}
for x := range arr {
if !c.writeInteger(x) {
return false
}
}
return true
}
func (c *Connection) writeOk() bool {
if _, err := c.conn.Write(redisOk); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeQueued() bool {
if _, err := c.conn.Write(redisQueued); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeBlock(data []byte) bool {
if err := c.writeBulkString(data); err != nil {
logError(err)
return false
}
return true
}
func (c *Connection) writeBulkString(data []byte) (err error) {
con := c.conn
if data == nil {
_, err = con.Write(redisNoSuchBlock)
} else {
if _, err = con.Write([]byte(fmt.Sprintf("$%d\r\n", len(data)))); err != nil {
return
}
if _, err = con.Write(data); err != nil {
return
}
_, err = con.Write(redisCrnl)
}
return
}