diff --git a/interleaver/sqlite.go b/interleaver/sqlite.go index 52f1813..e1789b8 100644 --- a/interleaver/sqlite.go +++ b/interleaver/sqlite.go @@ -23,6 +23,8 @@ const ( var ErrDatabaseNotExists = errors.New("Database does not exists.") +const blocksPerTx = 128 + type ( SQLiteBlockProducer struct { db *sql.DB @@ -33,6 +35,8 @@ type ( SQLiteBlockConsumer struct { db *sql.DB insertStmt *sql.Stmt + tx *sql.Tx + txCounter int joiner common.KeyJoiner } ) @@ -58,6 +62,11 @@ func NewSQLiteBlockConsumer( db.Close() return } + } else { + if _, err = db.Exec(deleteSql); err != nil { + db.Close() + return + } } var insertStmt *sql.Stmt @@ -66,21 +75,49 @@ func NewSQLiteBlockConsumer( return } + var tx *sql.Tx + if tx, err = db.Begin(); err != nil { + insertStmt.Close() + db.Close() + return + } + sbc = &SQLiteBlockConsumer{ db: db, insertStmt: insertStmt, + tx: tx, joiner: joiner} return } func (sbc *SQLiteBlockConsumer) Close() error { + sbc.tx.Commit() sbc.insertStmt.Close() return sbc.db.Close() } +func (sbc *SQLiteBlockConsumer) getTx() (tx *sql.Tx, err error) { + if sbc.txCounter >= blocksPerTx { + sbc.txCounter = 0 + if err = sbc.tx.Commit(); err != nil { + return + } + if sbc.tx, err = sbc.db.Begin(); err != nil { + return + } + } + sbc.txCounter++ + tx = sbc.tx + return +} + func (sbc *SQLiteBlockConsumer) Consume(block *Block) (err error) { - _, err = sbc.insertStmt.Exec(sbc.joiner(block.Coord), block.Data) + var tx *sql.Tx + if tx, err = sbc.getTx(); err != nil { + return + } + _, err = tx.Stmt(sbc.insertStmt).Exec(sbc.joiner(block.Coord), block.Data) return }