Added block consumer for SQLite3 in interleaver.

This commit is contained in:
Sascha L. Teichmann 2014-08-20 21:05:36 +02:00
parent 87b34d123e
commit cfb6edcd29

View File

@ -6,6 +6,7 @@ package main
import (
"database/sql"
"errors"
"os"
"bitbucket.org/s_l_teichmann/mtredisalize/common"
@ -14,19 +15,81 @@ import (
)
const (
fetchSql = "SELECT pos, data FROM blocks"
createSql = "CREATE TABLE blocks (pos INT NOT NULL PRIMARY KEY, data BLOB)"
insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
deleteSql = "DELETE FROM blocks"
selectSql = "SELECT pos, data FROM blocks"
)
type SQLiteBlockProducer struct {
db *sql.DB
rows *sql.Rows
splitter common.KeySplitter
var DatabaseNotExistsErr = errors.New("Database does not exists.")
type (
SQLiteBlockProducer struct {
db *sql.DB
rows *sql.Rows
splitter common.KeySplitter
}
SQLiteBlockConsumer struct {
db *sql.DB
insertStmt *sql.Stmt
joiner common.KeyJoiner
}
)
func fileExists(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
func NewSQLiteBlockProducer(path string, splitter common.KeySplitter) (sbp *SQLiteBlockProducer, err error) {
func NewSQLiteBlockConsumer(
path string,
joiner common.KeyJoiner) (sbc *SQLiteBlockConsumer, err error) {
// check if we can stat it -> exists.
if _, err = os.Stat(path); err != nil {
createNew := !fileExists(path)
var db *sql.DB
if db, err = sql.Open("sqlite3", path); err != nil {
return
}
if createNew {
if _, err = db.Exec(createSql); err != nil {
db.Close()
return
}
}
var insertStmt *sql.Stmt
if insertStmt, err = db.Prepare(insertSql); err != nil {
db.Close()
return
}
sbc = &SQLiteBlockConsumer{
db: db,
insertStmt: insertStmt,
joiner: joiner}
return
}
func (sbc *SQLiteBlockConsumer) Close() error {
sbc.insertStmt.Close()
return sbc.db.Close()
}
func (sbc *SQLiteBlockConsumer) Consume(block *Block) (err error) {
_, err = sbc.insertStmt.Exec(sbc.joiner(block.Coord), block.Data)
return
}
func NewSQLiteBlockProducer(
path string,
splitter common.KeySplitter) (sbp *SQLiteBlockProducer, err error) {
if !fileExists(path) {
err = DatabaseNotExistsErr
return
}
@ -36,7 +99,7 @@ func NewSQLiteBlockProducer(path string, splitter common.KeySplitter) (sbp *SQLi
}
var rows *sql.Rows
if rows, err = db.Query(fetchSql); err != nil {
if rows, err = db.Query(selectSql); err != nil {
db.Close()
return
}