// Copyright 2014, 2015 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.

package main

import (
	"os"

	"bitbucket.org/s_l_teichmann/mtsatellite/common"

	leveldb "github.com/jmhodges/levigo"
)

type (
	// LevelDBBlockProducer is a helper to fetch blocks from a LevelDB.
	LevelDBBlockProducer struct {
		db       *leveldb.DB
		opts     *leveldb.Options
		ro       *leveldb.ReadOptions
		iterator *leveldb.Iterator
		splitter common.KeySplitter
		decoder  common.KeyDecoder
	}

	// LevelDBBlockConsumer is a helper to store blocks in a LevelDB.
	LevelDBBlockConsumer struct {
		db      *leveldb.DB
		opts    *leveldb.Options
		wo      *leveldb.WriteOptions
		joiner  common.KeyJoiner
		encoder common.KeyEncoder
	}
)

// NewLevelDBBlockProducer returns a new helper to fetch blocks from a LevelDB.
func NewLevelDBBlockProducer(path string,
	splitter common.KeySplitter,
	decoder common.KeyDecoder) (ldbp *LevelDBBlockProducer, err error) {

	// check if we can stat it -> exists.
	if _, err = os.Stat(path); err != nil {
		return
	}

	opts := leveldb.NewOptions()
	opts.SetCreateIfMissing(false)

	var db *leveldb.DB
	if db, err = leveldb.Open(path, opts); err != nil {
		opts.Close()
		return
	}

	ro := leveldb.NewReadOptions()
	ro.SetFillCache(false)

	iterator := db.NewIterator(ro)
	iterator.SeekToFirst()

	ldbp = &LevelDBBlockProducer{
		db:       db,
		opts:     opts,
		ro:       ro,
		iterator: iterator,
		splitter: splitter,
		decoder:  decoder}
	return
}

// Close closes a helper to fetch blocks from a LevelDB.
func (ldbp *LevelDBBlockProducer) Close() error {
	if ldbp.iterator != nil {
		ldbp.iterator.Close()
	}
	ldbp.ro.Close()
	ldbp.db.Close()
	ldbp.opts.Close()
	return nil
}

// Next fetches the next block from a LevelDB.
func (ldbp *LevelDBBlockProducer) Next(block *common.Block) (err error) {
	if ldbp.iterator == nil {
		err = common.ErrNoMoreBlocks
		return
	}
	if !ldbp.iterator.Valid() {
		if err = ldbp.iterator.GetError(); err == nil {
			err = common.ErrNoMoreBlocks
		}
		ldbp.iterator.Close()
		ldbp.iterator = nil
		return
	}
	var key int64
	if key, err = ldbp.decoder(ldbp.iterator.Key()); err != nil {
		return
	}
	block.Coord = ldbp.splitter(key)
	block.Data = ldbp.iterator.Value()
	ldbp.iterator.Next()
	return
}

// NewLevelDBBlockConsumer returns a new helper to store blocks in a LevelDB.
func NewLevelDBBlockConsumer(
	path string,
	joiner common.KeyJoiner,
	encoder common.KeyEncoder) (ldbc *LevelDBBlockConsumer, err error) {

	opts := leveldb.NewOptions()
	opts.SetCreateIfMissing(true)

	var db *leveldb.DB
	if db, err = leveldb.Open(path, opts); err != nil {
		return
	}

	ldbc = &LevelDBBlockConsumer{
		db:      db,
		opts:    opts,
		wo:      leveldb.NewWriteOptions(),
		joiner:  joiner,
		encoder: encoder}
	return
}

// Close closes a helper to store blocks in a LevelDB.
func (ldbc *LevelDBBlockConsumer) Close() error {
	ldbc.wo.Close()
	ldbc.db.Close()
	ldbc.opts.Close()
	return nil
}

// Consume stores a block in LevelDB.
func (ldbc *LevelDBBlockConsumer) Consume(block *common.Block) (err error) {
	var encodedKey []byte
	if encodedKey, err = ldbc.encoder(ldbc.joiner(block.Coord)); err != nil {
		return
	}
	err = ldbc.db.Put(ldbc.wo, encodedKey, block.Data)
	return
}