// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.

package main

import (
	"database/sql"
	"errors"
	"os"

	"bitbucket.org/s_l_teichmann/mtsatellite/common"

	_ "github.com/mattn/go-sqlite3"
)

const (
	createSQL = "CREATE TABLE blocks (pos INT NOT NULL PRIMARY KEY, data BLOB)"
	insertSQL = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
	deleteSQL = "DELETE FROM blocks"
	selectSQL = "SELECT pos, data FROM blocks"
)

// ErrDatabaseNotExists indicates that the database does not exist.
var ErrDatabaseNotExists = errors.New("database does not exists")

const blocksPerTx = 128 // Number of blocks copied in a transaction.

type (
	// SQLiteBlockProducer helps getting blocks from a SQLite database.
	SQLiteBlockProducer struct {
		db       *sql.DB
		rows     *sql.Rows
		splitter common.KeySplitter
	}

	// SQLiteBlockConsumer helps storing blocks into a SQLite database.
	SQLiteBlockConsumer struct {
		db         *sql.DB
		insertStmt *sql.Stmt
		tx         *sql.Tx
		txCounter  int
		joiner     common.KeyJoiner
	}
)

func fileExists(path string) bool {
	_, err := os.Stat(path)
	return !os.IsNotExist(err)
}

// NewSQLiteBlockConsumer returns a storage helper for SQLite databases.
func NewSQLiteBlockConsumer(
	path string,
	joiner common.KeyJoiner) (sbc *SQLiteBlockConsumer, err error) {

	createNew := !fileExists(path)

	var db *sql.DB
	if db, err = sql.Open("sqlite3", path); err != nil {
		return
	}

	if createNew {
		if _, err = db.Exec(createSQL); err != nil {
			db.Close()
			return
		}
	} else {
		if _, err = db.Exec(deleteSQL); err != nil {
			db.Close()
			return
		}
	}

	var insertStmt *sql.Stmt
	if insertStmt, err = db.Prepare(insertSQL); err != nil {
		db.Close()
		return
	}

	var tx *sql.Tx
	if tx, err = db.Begin(); err != nil {
		insertStmt.Close()
		db.Close()
		return
	}

	sbc = &SQLiteBlockConsumer{
		db:         db,
		insertStmt: insertStmt,
		tx:         tx,
		joiner:     joiner}

	return
}

// Close closes a SQLite storage helper.
func (sbc *SQLiteBlockConsumer) Close() error {
	sbc.tx.Commit()
	sbc.insertStmt.Close()
	return sbc.db.Close()
}

func (sbc *SQLiteBlockConsumer) getTx() (tx *sql.Tx, err error) {
	if sbc.txCounter >= blocksPerTx {
		sbc.txCounter = 0
		if err = sbc.tx.Commit(); err != nil {
			return
		}
		if sbc.tx, err = sbc.db.Begin(); err != nil {
			return
		}
	}
	sbc.txCounter++
	tx = sbc.tx
	return
}

// Consume stores a block in an SQLite database.
func (sbc *SQLiteBlockConsumer) Consume(block *common.Block) (err error) {
	var tx *sql.Tx
	if tx, err = sbc.getTx(); err != nil {
		return
	}
	_, err = tx.Stmt(sbc.insertStmt).Exec(sbc.joiner(block.Coord), block.Data)
	return
}

// NewSQLiteBlockProducer returns a new producer to fetch blocks from a
// SQLite database.
func NewSQLiteBlockProducer(
	path string,
	splitter common.KeySplitter) (sbp *SQLiteBlockProducer, err error) {

	if !fileExists(path) {
		err = ErrDatabaseNotExists
		return
	}

	var db *sql.DB
	if db, err = sql.Open("sqlite3", path); err != nil {
		return
	}

	var rows *sql.Rows
	if rows, err = db.Query(selectSQL); err != nil {
		db.Close()
		return
	}

	sbp = &SQLiteBlockProducer{
		db:       db,
		rows:     rows,
		splitter: splitter}

	return
}

// Next fetches the next block from a SQLite database.
func (sbp *SQLiteBlockProducer) Next(block *common.Block) (err error) {
	if sbp.rows == nil {
		err = common.ErrNoMoreBlocks
		return
	}
	if sbp.rows.Next() {
		var key int64
		if err = sbp.rows.Scan(&key, &block.Data); err == nil {
			block.Coord = sbp.splitter(key)
		}
	} else {
		sbp.rows.Close()
		sbp.rows = nil
		err = common.ErrNoMoreBlocks
	}
	return
}

// Close closes a block producer from a SQLite database.
func (sbp *SQLiteBlockProducer) Close() error {
	if sbp.rows != nil {
		sbp.rows.Close()
	}
	return sbp.db.Close()
}