diff --git a/backend.go b/backend.go index 9f92b1d..9085056 100644 --- a/backend.go +++ b/backend.go @@ -5,6 +5,6 @@ type Backend interface { InTransaction() bool Store(hash, key, value []byte) (bool, error) BeginTransaction() error - CommitTransAction() error + CommitTransaction() error Shutdown() error } diff --git a/connection.go b/connection.go index f1ca59f..d7f8da4 100644 --- a/connection.go +++ b/connection.go @@ -85,7 +85,7 @@ func (c *Connection) Exec() bool { } arr := c.boolArray c.boolArray = []bool{} - if err := c.backend.CommitTransAction(); err != nil { + if err := c.backend.CommitTransaction(); err != nil { return c.writeError(err) } return c.writeBoolArray(arr) diff --git a/leveldb.go b/leveldb.go index 37b20e8..e0e92fb 100644 --- a/leveldb.go +++ b/leveldb.go @@ -27,6 +27,11 @@ func NewLeveDBBackend(path string, cacheSize int) (ldb *LevelDBBackend, err erro } func (ldb *LevelDBBackend) Shutdown() error { + tx := ldb.tx + if tx != nil { + ldb.tx = nil + tx.Close() + } ldb.db.Close() ldb.cache.Close() return nil @@ -78,7 +83,7 @@ func (ldb *LevelDBBackend) BeginTransaction() error { return nil } -func (ldb *LevelDBBackend) CommitTransAction() error { +func (ldb *LevelDBBackend) CommitTransaction() error { tx := ldb.tx ldb.tx = nil wo := leveldb.NewWriteOptions() diff --git a/main.go b/main.go index 8f10844..d541e6d 100644 --- a/main.go +++ b/main.go @@ -13,9 +13,11 @@ func main() { var port int var host string + var driver string var cacheSize int flag.IntVar(&port, "port", 6379, "port to bind") + flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)") flag.StringVar(&host, "host", "", "host to bind") flag.IntVar(&cacheSize, "cache", 32, "cache size in MB") flag.Parse() @@ -29,9 +31,15 @@ func main() { var err error var backend Backend - if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil { - log.Fatal(err) - } + if driver == "sqlite" { + if backend, err = NewSqliteBackend(args[0]); err != nil { + log.Fatal(err) + } + } else { + if backend, err = NewLeveDBBackend(args[0], cacheSize); err != nil { + log.Fatal(err) + } + } defer backend.Shutdown() var listener net.Listener diff --git a/parser.go b/parser.go index f3dfa99..91636d6 100644 --- a/parser.go +++ b/parser.go @@ -38,9 +38,7 @@ func (rp *RedisParser) nextLine() []byte { rp.consumer.ConsumeError(err) return nil } - line = bytes.TrimRight(line, "\r\n") - log.Printf("%q", line) - return line + return bytes.TrimRight(line, "\r\n") } func (rp *RedisParser) dispatch(line []byte) bool { @@ -179,13 +177,13 @@ func (rce *RedisCommandExecutor) execute() bool { } hash, ok1 := rce.args[1].([]byte) key, ok2 := rce.args[2].([]byte) - block, ok3 := rce.args[3].([]byte) + value, ok3 := rce.args[3].([]byte) if !ok1 || !ok2 || !ok3 { log.Printf("WARM HSET data are not byte slices,") return false } - return rce.commands.Hset(hash, key, block) + return rce.commands.Hset(hash, key, value) case "MULTI": return rce.commands.Multi() @@ -198,25 +196,14 @@ func (rce *RedisCommandExecutor) execute() bool { } func (rce *RedisCommandExecutor) ConsumeSimpleString(s string) bool { - log.Printf("simple string: %s", s) return rce.push(s) } -func shorten(data []byte) string { - if len(data) > 10 { - return fmt.Sprintf("%.10q...", data) - } - return fmt.Sprintf("%q", data) -} - func (rce *RedisCommandExecutor) ConsumeBulkString(data []byte) bool { - s := shorten(data) - log.Printf("buld string: len = %d: %s", len(data), s) return rce.push(data) } func (rce *RedisCommandExecutor) ConsumeInteger(i int64) bool { - log.Printf("integer: %d", i) return rce.push(i) } @@ -226,7 +213,6 @@ func (rce *RedisCommandExecutor) ConsumeError(err error) bool { } func (rce *RedisCommandExecutor) ConsumeArray(i int64) bool { - log.Printf("array: %d", i) if rce.missing > 0 { log.Println("WARN: Nested arrays are not supported!") return false diff --git a/sqlite.go b/sqlite.go new file mode 100644 index 0000000..32e46c2 --- /dev/null +++ b/sqlite.go @@ -0,0 +1,194 @@ +package main + +import ( + "database/sql" + "log" + "strconv" + "sync" + + _ "github.com/mattn/go-sqlite3" +) + +var globalLock sync.RWMutex + +const ( + fetchSql = "SELECT data FROM blocks WHERE pos = ?" + existsSql = "SELECT 1 FROM blocks WHERE pos = ?" + updateSql = "UPDATE blocks SET data = ? WHERE pos = ?" + insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)" +) + +type SqliteBackend struct { + db *sql.DB + tx *sql.Tx + existsStmt *sql.Stmt + fetchStmt *sql.Stmt + insertStmt *sql.Stmt + updateStmt *sql.Stmt +} + +func NewSqliteBackend(path string) (sqlb *SqliteBackend, err error) { + + res := SqliteBackend{} + + if res.db, err = sql.Open("sqlite3", path); err != nil { + return + } + + if res.existsStmt, err = res.db.Prepare(existsSql); err != nil { + res.closeAll() + return + } + + if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil { + res.closeAll() + return + } + + if res.insertStmt, err = res.db.Prepare(insertSql); err != nil { + res.closeAll() + return + } + + if res.updateStmt, err = res.db.Prepare(updateSql); err != nil { + res.closeAll() + return + } + + sqlb = &res + return +} + +func rollbackTx(tx **sql.Tx) error { + t := *tx + if t != nil { + *tx = nil + return t.Rollback() + } + return nil +} + +func closeStmt(stmt **sql.Stmt) error { + s := *stmt + if s != nil { + *stmt = nil + return s.Close() + } + return nil +} + +func closeDB(db **sql.DB) error { + d := *db + if d != nil { + *db = nil + return d.Close() + } + return nil +} + +func (sqlb *SqliteBackend) closeAll() error { + rollbackTx(&sqlb.tx) + closeStmt(&sqlb.fetchStmt) + closeStmt(&sqlb.insertStmt) + closeStmt(&sqlb.updateStmt) + closeStmt(&sqlb.existsStmt) + return closeDB(&sqlb.db) +} + +func (sqlb *SqliteBackend) Shutdown() error { + globalLock.Lock() + defer globalLock.Unlock() + + return sqlb.closeAll() +} + +func (sqlb *SqliteBackend) txStmt(stmt *sql.Stmt) *sql.Stmt { + if sqlb.tx != nil { + return sqlb.tx.Stmt(stmt) + } + return stmt +} + +func bytes2pos(key []byte) (pos int64, err error) { + return strconv.ParseInt(string(key), 10, 64) +} + +func (sqlb *SqliteBackend) Fetch(hash, key []byte) (data []byte, err error) { + var pos int64 + if pos, err = bytes2pos(key); err != nil { + return + } + + globalLock.RLock() + defer globalLock.RUnlock() + + fetchStmt := sqlb.txStmt(sqlb.fetchStmt) + err2 := fetchStmt.QueryRow(pos).Scan(&data) + if err2 == sql.ErrNoRows { + return + } + err = err2 + return +} + +func (sqlb *SqliteBackend) InTransaction() bool { + return sqlb.tx != nil +} + +func (sqlb *SqliteBackend) Store(hash, key, value []byte) (exists bool, err error) { + var pos int64 + if pos, err = bytes2pos(key); err != nil { + return + } + + globalLock.Lock() + defer globalLock.Unlock() + + existsStmt := sqlb.txStmt(sqlb.existsStmt) + var x int + err2 := existsStmt.QueryRow(pos).Scan(&x) + + if err2 == sql.ErrNoRows { + exists = false + } else if err2 != nil { + err = err2 + return + } else { + exists = true + } + + if exists { + updateStmt := sqlb.txStmt(sqlb.updateStmt) + _, err = updateStmt.Exec(value, pos) + } else { + insertStmt := sqlb.txStmt(sqlb.insertStmt) + _, err = insertStmt.Exec(pos, value) + } + return +} + +func (sqlb *SqliteBackend) BeginTransaction() (err error) { + if sqlb.tx != nil { + log.Println("WARN: Already running transaction.") + return nil + } + + globalLock.Lock() + defer globalLock.Unlock() + sqlb.tx, err = sqlb.db.Begin() + return +} + +func (sqlb *SqliteBackend) CommitTransaction() error { + + tx := sqlb.tx + if tx == nil { + log.Println("WARN: No transaction running.") + return nil + } + + globalLock.Lock() + defer globalLock.Unlock() + sqlb.tx = nil + return tx.Commit() +}