diff --git a/interleaver/sqlite.go b/interleaver/sqlite.go index 6ca4f1a..a202455 100644 --- a/interleaver/sqlite.go +++ b/interleaver/sqlite.go @@ -6,6 +6,7 @@ package main import ( "database/sql" + "errors" "os" "bitbucket.org/s_l_teichmann/mtredisalize/common" @@ -14,19 +15,81 @@ import ( ) const ( - fetchSql = "SELECT pos, data FROM blocks" + createSql = "CREATE TABLE blocks (pos INT NOT NULL PRIMARY KEY, data BLOB)" + insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)" + deleteSql = "DELETE FROM blocks" + selectSql = "SELECT pos, data FROM blocks" ) -type SQLiteBlockProducer struct { - db *sql.DB - rows *sql.Rows - splitter common.KeySplitter +var DatabaseNotExistsErr = errors.New("Database does not exists.") + +type ( + SQLiteBlockProducer struct { + db *sql.DB + rows *sql.Rows + splitter common.KeySplitter + } + + SQLiteBlockConsumer struct { + db *sql.DB + insertStmt *sql.Stmt + joiner common.KeyJoiner + } +) + +func fileExists(path string) bool { + _, err := os.Stat(path) + return !os.IsNotExist(err) } -func NewSQLiteBlockProducer(path string, splitter common.KeySplitter) (sbp *SQLiteBlockProducer, err error) { +func NewSQLiteBlockConsumer( + path string, + joiner common.KeyJoiner) (sbc *SQLiteBlockConsumer, err error) { - // check if we can stat it -> exists. - if _, err = os.Stat(path); err != nil { + createNew := !fileExists(path) + + var db *sql.DB + if db, err = sql.Open("sqlite3", path); err != nil { + return + } + + if createNew { + if _, err = db.Exec(createSql); err != nil { + db.Close() + return + } + } + + var insertStmt *sql.Stmt + if insertStmt, err = db.Prepare(insertSql); err != nil { + db.Close() + return + } + + sbc = &SQLiteBlockConsumer{ + db: db, + insertStmt: insertStmt, + joiner: joiner} + + return +} + +func (sbc *SQLiteBlockConsumer) Close() error { + sbc.insertStmt.Close() + return sbc.db.Close() +} + +func (sbc *SQLiteBlockConsumer) Consume(block *Block) (err error) { + _, err = sbc.insertStmt.Exec(sbc.joiner(block.Coord), block.Data) + return +} + +func NewSQLiteBlockProducer( + path string, + splitter common.KeySplitter) (sbp *SQLiteBlockProducer, err error) { + + if !fileExists(path) { + err = DatabaseNotExistsErr return } @@ -36,7 +99,7 @@ func NewSQLiteBlockProducer(path string, splitter common.KeySplitter) (sbp *SQLi } var rows *sql.Rows - if rows, err = db.Query(fetchSql); err != nil { + if rows, err = db.Query(selectSql); err != nil { db.Close() return }