diff --git a/backend.go b/backend.go new file mode 100644 index 0000000..9f92b1d --- /dev/null +++ b/backend.go @@ -0,0 +1,10 @@ +package main + +type Backend interface { + Fetch(hash, key []byte) ([]byte, error) + InTransaction() bool + Store(hash, key, value []byte) (bool, error) + BeginTransaction() error + CommitTransAction() error + Shutdown() error +} diff --git a/connection.go b/connection.go index d3d50b7..f1ca59f 100644 --- a/connection.go +++ b/connection.go @@ -5,13 +5,11 @@ import ( "fmt" "log" "net" - - leveldb "github.com/jmhodges/levigo" ) var ( redisOk = []byte("+OK\r\n") - redisDbError = []byte("-FAIL\r\n") + redisError = []byte("-ERR\r\n") redisNoSuchBlock = []byte("$-1\r\n") redisCrnl = []byte("\r\n") redisEmptyArray = []byte("*0\r\n") @@ -19,18 +17,16 @@ var ( ) type Connection struct { - conn net.Conn - db *leveldb.DB - tx *leveldb.WriteBatch - intArray []int + conn net.Conn + backend Backend + boolArray []bool } -func NewConnection(conn net.Conn, db *leveldb.DB) *Connection { +func NewConnection(conn net.Conn, backend Backend) *Connection { return &Connection{ - conn, - db, - nil, - []int{}} + conn: conn, + backend: backend, + boolArray: []bool{}} } func (c *Connection) Run() { @@ -46,90 +42,58 @@ func logError(err error) { } func (c *Connection) Hget(hash, key []byte) bool { - log.Printf("client requested block: %q", key) + var err error var data []byte - ro := leveldb.NewReadOptions() - defer ro.Close() - if data, err = c.db.Get(ro, key); err != nil { - logError(err) - return c.writeError() + if data, err = c.backend.Fetch(hash, key); err != nil { + return c.writeError(err) } + return c.writeBlock(data) } -func (c *Connection) Hset(hash, key, block []byte) bool { - log.Printf("client wants to store block: %q", key) +func (c *Connection) Hset(hash, key, data []byte) bool { var err error - var exists int - - if exists, err = c.keyExists(key); err != nil { - logError(err) - return c.writeError() + var exists bool + if exists, err = c.backend.Store(hash, key, data); err != nil { + return c.writeError(err) } - if c.tx != nil { - c.tx.Put(key, block) - c.intArray = append(c.intArray, exists) + if c.backend.InTransaction() { + c.boolArray = append(c.boolArray, exists) return c.writeQueued() } - wo := leveldb.NewWriteOptions() - defer wo.Close() - if err = c.db.Put(wo, key, block); err != nil { - logError(err) - return c.writeError() - } - - return c.writeInteger(exists) -} - -func (c *Connection) keyExists(key []byte) (exists int, err error) { - ro := leveldb.NewReadOptions() - defer ro.Close() - var data []byte - if data, err = c.db.Get(ro, key); err != nil { - return - } - if data != nil { - exists = 1 - } else { - exists = 0 - } - return + return c.writeBool(exists) } func (c *Connection) Multi() bool { - if c.tx != nil { + if c.backend.InTransaction() { log.Println("WARN: Already running transaction.") } else { - c.tx = leveldb.NewWriteBatch() + if err := c.backend.BeginTransaction(); err != nil { + return c.writeError(err) + } } return c.writeOk() } func (c *Connection) Exec() bool { - var err error - if c.tx == nil { + if !c.backend.InTransaction() { return c.writeEmptyArray() } - tx := c.tx - c.tx = nil - defer tx.Close() - arr := c.intArray - c.intArray = []int{} - wo := leveldb.NewWriteOptions() - defer wo.Close() - if err = c.db.Write(wo, tx); err != nil { - logError(err) - return c.writeError() + arr := c.boolArray + c.boolArray = []bool{} + if err := c.backend.CommitTransAction(); err != nil { + return c.writeError(err) } - return c.writeIntegerArray(arr) + return c.writeBoolArray(arr) } -func (c *Connection) writeError() bool { - if _, err := c.conn.Write(redisDbError); err != nil { +func (c *Connection) writeError(err error) bool { + logError(err) + if _, err = c.conn.Write(redisError); err != nil { logError(err) return false } @@ -137,28 +101,35 @@ func (c *Connection) writeError() bool { } func (c *Connection) writeEmptyArray() bool { - if _, err := c.conn.Write(redisDbError); err != nil { + if _, err := c.conn.Write(redisEmptyArray); err != nil { logError(err) return false } return true } -func (c *Connection) writeInteger(v int) bool { - if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))); err != nil { +func asInt(b bool) int { + if b { + return 1 + } + return 0 +} + +func (c *Connection) writeBool(b bool) bool { + if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", asInt(b)))); err != nil { logError(err) return false } return true } -func (c *Connection) writeIntegerArray(arr []int) bool { +func (c *Connection) writeBoolArray(arr []bool) bool { if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil { logError(err) return false } - for x := range arr { - if !c.writeInteger(x) { + for _, b := range arr { + if !c.writeBool(b) { return false } } diff --git a/leveldb.go b/leveldb.go new file mode 100644 index 0000000..37b20e8 --- /dev/null +++ b/leveldb.go @@ -0,0 +1,87 @@ +package main + +import ( + leveldb "github.com/jmhodges/levigo" +) + +type LevelDBBackend struct { + cache *leveldb.Cache + db *leveldb.DB + tx *leveldb.WriteBatch +} + +func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err error) { + cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024) + opts := leveldb.NewOptions() + opts.SetCache(cache) + opts.SetCreateIfMissing(true) + var db *leveldb.DB + if db, err = leveldb.Open(path, opts); err != nil { + cache.Close() + return + } + ldb = &LevelDBBackend{ + cache: cache, + db: db} + return +} + +func (ldb *LevelDBBackend) Shutdown() error { + ldb.db.Close() + ldb.cache.Close() + return nil +} + +func (ldb *LevelDBBackend) Fetch(hash, key []byte) ([]byte, error) { + ro := leveldb.NewReadOptions() + defer ro.Close() + return ldb.db.Get(ro, key) +} + +func (ldb *LevelDBBackend) InTransaction() bool { + return ldb.tx != nil +} + +func (ldb *LevelDBBackend) keyExists(key []byte) (exists bool, err error) { + ro := leveldb.NewReadOptions() + defer ro.Close() + var data []byte + if data, err = ldb.db.Get(ro, key); err != nil { + return + } + exists = data != nil + return +} + +func (ldb *LevelDBBackend) Store(hash, key, value []byte) (exists bool, err error) { + + if exists, err = ldb.keyExists(key); err != nil { + return + } + + if ldb.tx != nil { + ldb.tx.Put(key, value) + return + } + + wo := leveldb.NewWriteOptions() + defer wo.Close() + if err = ldb.db.Put(wo, key, value); err != nil { + return + } + + return +} + +func (ldb *LevelDBBackend) BeginTransaction() error { + ldb.tx = leveldb.NewWriteBatch() + return nil +} + +func (ldb *LevelDBBackend) CommitTransAction() error { + tx := ldb.tx + ldb.tx = nil + wo := leveldb.NewWriteOptions() + defer wo.Close() + return ldb.db.Write(wo, tx) +} diff --git a/main.go b/main.go index 17c7ec7..8f10844 100644 --- a/main.go +++ b/main.go @@ -7,8 +7,6 @@ import ( "net" "os" "os/signal" - - leveldb "github.com/jmhodges/levigo" ) func main() { @@ -28,20 +26,13 @@ func main() { log.Fatal("Missing path to world") } - cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024) - defer cache.Close() - - opts := leveldb.NewOptions() - opts.SetCache(cache) - opts.SetCreateIfMissing(true) - var err error - var db *leveldb.DB + var backend Backend - if db, err = leveldb.Open(args[0], opts); err != nil { + if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil { log.Fatal(err) } - defer db.Close() + defer backend.Shutdown() var listener net.Listener @@ -71,7 +62,7 @@ func main() { for { select { case conn := <-connChan: - go NewConnection(conn, db).Run() + go NewConnection(conn, backend).Run() case <-sigChan: log.Println("Shutting down") return