Added SQLite3 backend support.

This commit is contained in:
Sascha L. Teichmann 2014-08-03 14:52:24 +02:00
parent da0b5ebba6
commit ccb6849903
6 changed files with 216 additions and 23 deletions

View File

@ -5,6 +5,6 @@ type Backend interface {
InTransaction() bool
Store(hash, key, value []byte) (bool, error)
BeginTransaction() error
CommitTransAction() error
CommitTransaction() error
Shutdown() error
}

View File

@ -85,7 +85,7 @@ func (c *Connection) Exec() bool {
}
arr := c.boolArray
c.boolArray = []bool{}
if err := c.backend.CommitTransAction(); err != nil {
if err := c.backend.CommitTransaction(); err != nil {
return c.writeError(err)
}
return c.writeBoolArray(arr)

View File

@ -27,6 +27,11 @@ func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err erro
}
func (ldb *LevelDBBackend) Shutdown() error {
tx := ldb.tx
if tx != nil {
ldb.tx = nil
tx.Close()
}
ldb.db.Close()
ldb.cache.Close()
return nil
@ -78,7 +83,7 @@ func (ldb *LevelDBBackend) BeginTransaction() error {
return nil
}
func (ldb *LevelDBBackend) CommitTransAction() error {
func (ldb *LevelDBBackend) CommitTransaction() error {
tx := ldb.tx
ldb.tx = nil
wo := leveldb.NewWriteOptions()

View File

@ -13,9 +13,11 @@ func main() {
var port int
var host string
var driver string
var cacheSize int
flag.IntVar(&port, "port", 6379, "port to bind")
flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)")
flag.StringVar(&host, "host", "", "host to bind")
flag.IntVar(&cacheSize, "cache", 32, "cache size in MB")
flag.Parse()
@ -29,9 +31,15 @@ func main() {
var err error
var backend Backend
if driver == "sqlite" {
if backend, err = NewSqliteBackend(args[0]); err != nil {
log.Fatal(err)
}
} else {
if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil {
log.Fatal(err)
}
}
defer backend.Shutdown()
var listener net.Listener

View File

@ -38,9 +38,7 @@ func (rp *RedisParser) nextLine() []byte {
rp.consumer.ConsumeError(err)
return nil
}
line = bytes.TrimRight(line, "\r\n")
log.Printf("%q", line)
return line
return bytes.TrimRight(line, "\r\n")
}
func (rp *RedisParser) dispatch(line []byte) bool {
@ -179,13 +177,13 @@ func (rce *RedisCommandExecutor) execute() bool {
}
hash, ok1 := rce.args[1].([]byte)
key, ok2 := rce.args[2].([]byte)
block, ok3 := rce.args[3].([]byte)
value, ok3 := rce.args[3].([]byte)
if !ok1 || !ok2 || !ok3 {
log.Printf("WARM HSET data are not byte slices,")
return false
}
return rce.commands.Hset(hash, key, block)
return rce.commands.Hset(hash, key, value)
case "MULTI":
return rce.commands.Multi()
@ -198,25 +196,14 @@ func (rce *RedisCommandExecutor) execute() bool {
}
func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool {
log.Printf("simple string: %s", s)
return rce.push(s)
}
func shorten(data []byte) string {
if len(data) > 10 {
return fmt.Sprintf("%.10q...", data)
}
return fmt.Sprintf("%q", data)
}
func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool {
s := shorten(data)
log.Printf("buld string: len = %d: %s", len(data), s)
return rce.push(data)
}
func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool {
log.Printf("integer: %d", i)
return rce.push(i)
}
@ -226,7 +213,6 @@ func (rce *RedisCommandExecutor) ConsumeError(err error) bool {
}
func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool {
log.Printf("array: %d", i)
if rce.missing > 0 {
log.Println("WARN: Nested arrays are not supported!")
return false

194
sqlite.go Normal file
View File

@ -0,0 +1,194 @@
package main
import (
"database/sql"
"log"
"strconv"
"sync"
_ "github.com/mattn/go-sqlite3"
)
var globalLock sync.RWMutex
const (
fetchSql = "SELECT data FROM blocks WHERE pos = ?"
existsSql = "SELECT 1 FROM blocks WHERE pos = ?"
updateSql = "UPDATE blocks SET data = ? WHERE pos = ?"
insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
)
type SqliteBackend struct {
db *sql.DB
tx *sql.Tx
existsStmt *sql.Stmt
fetchStmt *sql.Stmt
insertStmt *sql.Stmt
updateStmt *sql.Stmt
}
func NewSqliteBackend(path string) (sqlb *SqliteBackend, err error) {
res := SqliteBackend{}
if res.db, err = sql.Open("sqlite3", path); err != nil {
return
}
if res.existsStmt, err = res.db.Prepare(existsSql); err != nil {
res.closeAll()
return
}
if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil {
res.closeAll()
return
}
if res.insertStmt, err = res.db.Prepare(insertSql); err != nil {
res.closeAll()
return
}
if res.updateStmt, err = res.db.Prepare(updateSql); err != nil {
res.closeAll()
return
}
sqlb = &res
return
}
func rollbackTx(tx **sql.Tx) error {
t := *tx
if t != nil {
*tx = nil
return t.Rollback()
}
return nil
}
func closeStmt(stmt **sql.Stmt) error {
s := *stmt
if s != nil {
*stmt = nil
return s.Close()
}
return nil
}
func closeDB(db **sql.DB) error {
d := *db
if d != nil {
*db = nil
return d.Close()
}
return nil
}
func (sqlb *SqliteBackend) closeAll() error {
rollbackTx(&sqlb.tx)
closeStmt(&sqlb.fetchStmt)
closeStmt(&sqlb.insertStmt)
closeStmt(&sqlb.updateStmt)
closeStmt(&sqlb.existsStmt)
return closeDB(&sqlb.db)
}
func (sqlb *SqliteBackend) Shutdown() error {
globalLock.Lock()
defer globalLock.Unlock()
return sqlb.closeAll()
}
func (sqlb *SqliteBackend) txStmt(stmt *sql.Stmt) *sql.Stmt {
if sqlb.tx != nil {
return sqlb.tx.Stmt(stmt)
}
return stmt
}
func bytes2pos(key []byte) (pos int64, err error) {
return strconv.ParseInt(string(key), 10, 64)
}
func (sqlb *SqliteBackend) Fetch(hash, key []byte) (data []byte, err error) {
var pos int64
if pos, err = bytes2pos(key); err != nil {
return
}
globalLock.RLock()
defer globalLock.RUnlock()
fetchStmt := sqlb.txStmt(sqlb.fetchStmt)
err2 := fetchStmt.QueryRow(pos).Scan(&data)
if err2 == sql.ErrNoRows {
return
}
err = err2
return
}
func (sqlb *SqliteBackend) InTransaction() bool {
return sqlb.tx != nil
}
func (sqlb *SqliteBackend) Store(hash, key, value []byte) (exists bool, err error) {
var pos int64
if pos, err = bytes2pos(key); err != nil {
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := sqlb.txStmt(sqlb.existsStmt)
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
exists = false
} else if err2 != nil {
err = err2
return
} else {
exists = true
}
if exists {
updateStmt := sqlb.txStmt(sqlb.updateStmt)
_, err = updateStmt.Exec(value, pos)
} else {
insertStmt := sqlb.txStmt(sqlb.insertStmt)
_, err = insertStmt.Exec(pos, value)
}
return
}
func (sqlb *SqliteBackend) BeginTransaction() (err error) {
if sqlb.tx != nil {
log.Println("WARN: Already running transaction.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
sqlb.tx, err = sqlb.db.Begin()
return
}
func (sqlb *SqliteBackend) CommitTransaction() error {
tx := sqlb.tx
if tx == nil {
log.Println("WARN: No transaction running.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
sqlb.tx = nil
return tx.Commit()
}