diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..d3d50b7 --- /dev/null +++ b/connection.go @@ -0,0 +1,206 @@ +package main + +import ( + "bufio" + "fmt" + "log" + "net" + + leveldb "github.com/jmhodges/levigo" +) + +var ( + redisOk = []byte("+OK\r\n") + redisDbError = []byte("-FAIL\r\n") + redisNoSuchBlock = []byte("$-1\r\n") + redisCrnl = []byte("\r\n") + redisEmptyArray = []byte("*0\r\n") + redisQueued = []byte("+QUEUED\r\n") +) + +type Connection struct { + conn net.Conn + db *leveldb.DB + tx *leveldb.WriteBatch + intArray []int +} + +func NewConnection(conn net.Conn, db *leveldb.DB) *Connection { + return &Connection{ + conn, + db, + nil, + []int{}} +} + +func (c *Connection) Run() { + defer c.conn.Close() + rce := NewRedisCommandExecutor(c) + r := bufio.NewReader(c.conn) + parser := NewRedisParser(r, rce) + parser.Parse() +} + +func logError(err error) { + log.Printf("ERROR: %s", err) +} + +func (c *Connection) Hget(hash, key []byte) bool { + log.Printf("client requested block: %q", key) + var err error + var data []byte + ro := leveldb.NewReadOptions() + defer ro.Close() + if data, err = c.db.Get(ro, key); err != nil { + logError(err) + return c.writeError() + } + return c.writeBlock(data) +} + +func (c *Connection) Hset(hash, key, block []byte) bool { + log.Printf("client wants to store block: %q", key) + + var err error + var exists int + + if exists, err = c.keyExists(key); err != nil { + logError(err) + return c.writeError() + } + + if c.tx != nil { + c.tx.Put(key, block) + c.intArray = append(c.intArray, exists) + return c.writeQueued() + } + + wo := leveldb.NewWriteOptions() + defer wo.Close() + if err = c.db.Put(wo, key, block); err != nil { + logError(err) + return c.writeError() + } + + return c.writeInteger(exists) +} + +func (c *Connection) keyExists(key []byte) (exists int, err error) { + ro := leveldb.NewReadOptions() + defer ro.Close() + var data []byte + if data, err = c.db.Get(ro, key); err != nil { + return + } + if data != nil { + exists = 1 + } else { + exists = 0 + } + return +} + +func (c *Connection) Multi() bool { + if c.tx != nil { + log.Println("WARN: Already running transaction.") + } else { + c.tx = leveldb.NewWriteBatch() + } + return c.writeOk() +} + +func (c *Connection) Exec() bool { + var err error + if c.tx == nil { + return c.writeEmptyArray() + } + tx := c.tx + c.tx = nil + defer tx.Close() + arr := c.intArray + c.intArray = []int{} + wo := leveldb.NewWriteOptions() + defer wo.Close() + if err = c.db.Write(wo, tx); err != nil { + logError(err) + return c.writeError() + } + return c.writeIntegerArray(arr) +} + +func (c *Connection) writeError() bool { + if _, err := c.conn.Write(redisDbError); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeEmptyArray() bool { + if _, err := c.conn.Write(redisDbError); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeInteger(v int) bool { + if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeIntegerArray(arr []int) bool { + if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil { + logError(err) + return false + } + for x := range arr { + if !c.writeInteger(x) { + return false + } + } + return true +} + +func (c *Connection) writeOk() bool { + if _, err := c.conn.Write(redisOk); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeQueued() bool { + if _, err := c.conn.Write(redisQueued); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeBlock(data []byte) bool { + if err := c.writeBulkString(data); err != nil { + logError(err) + return false + } + return true +} + +func (c *Connection) writeBulkString(data []byte) (err error) { + con := c.conn + if data == nil { + _, err = con.Write(redisNoSuchBlock) + } else { + if _, err = con.Write([]byte(fmt.Sprintf("$%d\r\n", len(data)))); err != nil { + return + } + if _, err = con.Write(data); err != nil { + return + } + _, err = con.Write(redisCrnl) + } + return +} diff --git a/main.go b/main.go index 194dfaa..17c7ec7 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "flag" "fmt" "log" @@ -12,197 +11,6 @@ import ( leveldb "github.com/jmhodges/levigo" ) -type connection struct { - conn net.Conn - db *leveldb.DB - tx *leveldb.WriteBatch - intArray []int -} - -func newConnection(conn net.Conn, db *leveldb.DB) *connection { - return &connection{ - conn, - db, - nil, - []int{}} -} - -func (c *connection) run() { - defer c.conn.Close() - rce := NewRedisCommandExecutor(c) - r := bufio.NewReader(c.conn) - parser := NewRedisParser(r, rce) - parser.Parse() -} - -func (c *connection) Hget(hash, key []byte) bool { - log.Printf("client requested block: %q", key) - var err error - var data []byte - ro := leveldb.NewReadOptions() - defer ro.Close() - if data, err = c.db.Get(ro, key); err != nil { - log.Printf("Something is wrong with db: %s", err) - return c.writeError() - } - return c.writeBlock(data) -} - -func (c *connection) Hset(hash, key, block []byte) bool { - log.Printf("client wants to store block: %q", key) - - var err error - var exists int - - if exists, err = c.keyExists(key); err != nil { - log.Printf("Something is wrong with db: %s", err) - return c.writeError() - } - - if c.tx != nil { - c.tx.Put(key, block) - c.intArray = append(c.intArray, exists) - return c.writeQueued() - } - - wo := leveldb.NewWriteOptions() - defer wo.Close() - if err = c.db.Put(wo, key, block); err != nil { - log.Printf("Something is wrong with db: %s", err) - return c.writeError() - } - return c.writeInteger(exists) -} - -func (c *connection) keyExists(key []byte) (exists int, err error) { - ro := leveldb.NewReadOptions() - defer ro.Close() - var data []byte - if data, err = c.db.Get(ro, key); err != nil { - return - } - if data != nil { - exists = 1 - } else { - exists = 0 - } - return -} - -func (c *connection) Multi() bool { - if c.tx != nil { - log.Println("WARN: Already running transaction.") - } else { - c.tx = leveldb.NewWriteBatch() - } - return c.writeOk() -} - -func (c *connection) Exec() bool { - var err error - if c.tx == nil { - return c.writeEmptyArray() - } - tx := c.tx - c.tx = nil - defer tx.Close() - arr := c.intArray - c.intArray = []int{} - wo := leveldb.NewWriteOptions() - defer wo.Close() - if err = c.db.Write(wo, tx); err != nil { - log.Printf("Something went wrong in writing transaction: %s", err) - return c.writeError() - } - return c.writeIntegerArray(arr) -} - -var ( - redisOk = []byte("+OK\r\n") - redisDbError = []byte("-FAIL\r\n") - redisNoSuchBlock = []byte("$-1\r\n") - redisCrnl = []byte("\r\n") - redisEmptyArray = []byte("*0\r\n") - redisQueued = []byte("+QUEUED\r\n") -) - -func (c *connection) writeError() bool { - if _, err := c.conn.Write(redisDbError); err != nil { - log.Printf("Something went wrong writing to client %s", err) - return false - } - return true -} - -func (c *connection) writeEmptyArray() bool { - if _, err := c.conn.Write(redisDbError); err != nil { - log.Printf("Writing message to client failed: %s", err) - return false - } - return true -} - -func (c *connection) writeInteger(v int) bool { - if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))); err != nil { - log.Printf("Writing message to client failed: %s", err) - return false - } - return true -} - -func (c *connection) writeIntegerArray(arr []int) bool { - if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil { - log.Printf("Writing message to client failed: %s", err) - return false - } - for x := range arr { - if !c.writeInteger(x) { - return false - } - } - return true -} - -func (c *connection) writeOk() bool { - if _, err := c.conn.Write(redisOk); err != nil { - log.Printf("Something went wrong writing to client %s", err) - return false - } - return true -} - -func (c *connection) writeQueued() bool { - if _, err := c.conn.Write(redisQueued); err != nil { - log.Printf("Something went wrong writing to client %s", err) - return false - } - return true -} - -func (c *connection) writeBlock(data []byte) bool { - if err := c.writeBulkString(data); err != nil { - log.Printf("Something went wrong writing to client %s", err) - return false - } - return true -} - -func (c *connection) writeBulkString(data []byte) (err error) { - con := c.conn - if data == nil { - _, err = con.Write(redisNoSuchBlock) - } else { - if _, err = con.Write([]byte(fmt.Sprintf("$%d\r\n", len(data)))); err != nil { - return - } - if _, err = con.Write(data); err != nil { - return - } - _, err = con.Write(redisCrnl) - } - return -} - func main() { var port int @@ -263,7 +71,7 @@ func main() { for { select { case conn := <-connChan: - go newConnection(conn, db).run() + go NewConnection(conn, db).Run() case <-sigChan: log.Println("Shutting down") return