Abstracted backend as an interface and implemented the LevelDB stuff as such.

This commit is contained in:
Sascha L. Teichmann 2014-08-03 11:25:25 +02:00
parent 095164a6c8
commit da0b5ebba6
4 changed files with 146 additions and 87 deletions

10
backend.go Normal file
View File

@ -0,0 +1,10 @@
package main
type Backend interface {
Fetch(hash, key []byte) ([]byte, error)
InTransaction() bool
Store(hash, key, value []byte) (bool, error)
BeginTransaction() error
CommitTransAction() error
Shutdown() error
}

View File

@ -5,13 +5,11 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
leveldb "github.com/jmhodges/levigo"
) )
var ( var (
redisOk = []byte("+OK\r\n") redisOk = []byte("+OK\r\n")
redisDbError = []byte("-FAIL\r\n") redisError = []byte("-ERR\r\n")
redisNoSuchBlock = []byte("$-1\r\n") redisNoSuchBlock = []byte("$-1\r\n")
redisCrnl = []byte("\r\n") redisCrnl = []byte("\r\n")
redisEmptyArray = []byte("*0\r\n") redisEmptyArray = []byte("*0\r\n")
@ -19,18 +17,16 @@ var (
) )
type Connection struct { type Connection struct {
conn net.Conn conn net.Conn
db *leveldb.DB backend Backend
tx *leveldb.WriteBatch boolArray []bool
intArray []int
} }
func NewConnection(conn net.Conn, db *leveldb.DB) *Connection { func NewConnection(conn net.Conn, backend Backend) *Connection {
return &Connection{ return &Connection{
conn, conn: conn,
db, backend: backend,
nil, boolArray: []bool{}}
[]int{}}
} }
func (c *Connection) Run() { func (c *Connection) Run() {
@ -46,90 +42,58 @@ func logError(err error) {
} }
func (c *Connection) Hget(hash, key []byte) bool { func (c *Connection) Hget(hash, key []byte) bool {
log.Printf("client requested block: %q", key)
var err error var err error
var data []byte var data []byte
ro := leveldb.NewReadOptions() if data, err = c.backend.Fetch(hash, key); err != nil {
defer ro.Close() return c.writeError(err)
if data, err = c.db.Get(ro, key); err != nil {
logError(err)
return c.writeError()
} }
return c.writeBlock(data) return c.writeBlock(data)
} }
func (c *Connection) Hset(hash, key, block []byte) bool { func (c *Connection) Hset(hash, key, data []byte) bool {
log.Printf("client wants to store block: %q", key)
var err error var err error
var exists int var exists bool
if exists, err = c.backend.Store(hash, key, data); err != nil {
if exists, err = c.keyExists(key); err != nil { return c.writeError(err)
logError(err)
return c.writeError()
} }
if c.tx != nil { if c.backend.InTransaction() {
c.tx.Put(key, block) c.boolArray = append(c.boolArray, exists)
c.intArray = append(c.intArray, exists)
return c.writeQueued() return c.writeQueued()
} }
wo := leveldb.NewWriteOptions() return c.writeBool(exists)
defer wo.Close()
if err = c.db.Put(wo, key, block); err != nil {
logError(err)
return c.writeError()
}
return c.writeInteger(exists)
}
func (c *Connection) keyExists(key []byte) (exists int, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = c.db.Get(ro, key); err != nil {
return
}
if data != nil {
exists = 1
} else {
exists = 0
}
return
} }
func (c *Connection) Multi() bool { func (c *Connection) Multi() bool {
if c.tx != nil { if c.backend.InTransaction() {
log.Println("WARN: Already running transaction.") log.Println("WARN: Already running transaction.")
} else { } else {
c.tx = leveldb.NewWriteBatch() if err := c.backend.BeginTransaction(); err != nil {
return c.writeError(err)
}
} }
return c.writeOk() return c.writeOk()
} }
func (c *Connection) Exec() bool { func (c *Connection) Exec() bool {
var err error if !c.backend.InTransaction() {
if c.tx == nil {
return c.writeEmptyArray() return c.writeEmptyArray()
} }
tx := c.tx arr := c.boolArray
c.tx = nil c.boolArray = []bool{}
defer tx.Close() if err := c.backend.CommitTransAction(); err != nil {
arr := c.intArray return c.writeError(err)
c.intArray = []int{}
wo := leveldb.NewWriteOptions()
defer wo.Close()
if err = c.db.Write(wo, tx); err != nil {
logError(err)
return c.writeError()
} }
return c.writeIntegerArray(arr) return c.writeBoolArray(arr)
} }
func (c *Connection) writeError() bool { func (c *Connection) writeError(err error) bool {
if _, err := c.conn.Write(redisDbError); err != nil { logError(err)
if _, err = c.conn.Write(redisError); err != nil {
logError(err) logError(err)
return false return false
} }
@ -137,28 +101,35 @@ func (c *Connection) writeError() bool {
} }
func (c *Connection) writeEmptyArray() bool { func (c *Connection) writeEmptyArray() bool {
if _, err := c.conn.Write(redisDbError); err != nil { if _, err := c.conn.Write(redisEmptyArray); err != nil {
logError(err) logError(err)
return false return false
} }
return true return true
} }
func (c *Connection) writeInteger(v int) bool { func asInt(b bool) int {
if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))); err != nil { if b {
return 1
}
return 0
}
func (c *Connection) writeBool(b bool) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", asInt(b)))); err != nil {
logError(err) logError(err)
return false return false
} }
return true return true
} }
func (c *Connection) writeIntegerArray(arr []int) bool { func (c *Connection) writeBoolArray(arr []bool) bool {
if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil { if _, err := c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil {
logError(err) logError(err)
return false return false
} }
for x := range arr { for _, b := range arr {
if !c.writeInteger(x) { if !c.writeBool(b) {
return false return false
} }
} }

87
leveldb.go Normal file
View File

@ -0,0 +1,87 @@
package main
import (
leveldb "github.com/jmhodges/levigo"
)
type LevelDBBackend struct {
cache *leveldb.Cache
db *leveldb.DB
tx *leveldb.WriteBatch
}
func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err error) {
cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024)
opts := leveldb.NewOptions()
opts.SetCache(cache)
opts.SetCreateIfMissing(true)
var db *leveldb.DB
if db, err = leveldb.Open(path, opts); err != nil {
cache.Close()
return
}
ldb = &LevelDBBackend{
cache: cache,
db: db}
return
}
func (ldb *LevelDBBackend) Shutdown() error {
ldb.db.Close()
ldb.cache.Close()
return nil
}
func (ldb *LevelDBBackend) Fetch(hash, key []byte) ([]byte, error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
return ldb.db.Get(ro, key)
}
func (ldb *LevelDBBackend) InTransaction() bool {
return ldb.tx != nil
}
func (ldb *LevelDBBackend) keyExists(key []byte) (exists bool, err error) {
ro := leveldb.NewReadOptions()
defer ro.Close()
var data []byte
if data, err = ldb.db.Get(ro, key); err != nil {
return
}
exists = data != nil
return
}
func (ldb *LevelDBBackend) Store(hash, key, value []byte) (exists bool, err error) {
if exists, err = ldb.keyExists(key); err != nil {
return
}
if ldb.tx != nil {
ldb.tx.Put(key, value)
return
}
wo := leveldb.NewWriteOptions()
defer wo.Close()
if err = ldb.db.Put(wo, key, value); err != nil {
return
}
return
}
func (ldb *LevelDBBackend) BeginTransaction() error {
ldb.tx = leveldb.NewWriteBatch()
return nil
}
func (ldb *LevelDBBackend) CommitTransAction() error {
tx := ldb.tx
ldb.tx = nil
wo := leveldb.NewWriteOptions()
defer wo.Close()
return ldb.db.Write(wo, tx)
}

17
main.go
View File

@ -7,8 +7,6 @@ import (
"net" "net"
"os" "os"
"os/signal" "os/signal"
leveldb "github.com/jmhodges/levigo"
) )
func main() { func main() {
@ -28,20 +26,13 @@ func main() {
log.Fatal("Missing path to world") log.Fatal("Missing path to world")
} }
cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024)
defer cache.Close()
opts := leveldb.NewOptions()
opts.SetCache(cache)
opts.SetCreateIfMissing(true)
var err error var err error
var db *leveldb.DB var backend Backend
if db, err = leveldb.Open(args[0], opts); err != nil { if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer db.Close() defer backend.Shutdown()
var listener net.Listener var listener net.Listener
@ -71,7 +62,7 @@ func main() {
for { for {
select { select {
case conn := <-connChan: case conn := <-connChan:
go NewConnection(conn, db).Run() go NewConnection(conn, backend).Run()
case <-sigChan: case <-sigChan:
log.Println("Shutting down") log.Println("Shutting down")
return return