mtsatellite/sqlite.go

271 lines
5.0 KiB
Go
Raw Normal View History

2014-08-03 15:59:56 +02:00
// Copyright 2014 by Sascha L. Teichmann
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
2014-08-03 14:52:24 +02:00
package main
import (
"database/sql"
"log"
"sync"
_ "github.com/mattn/go-sqlite3"
2014-08-16 16:06:42 +02:00
"bitbucket.org/s_l_teichmann/mtredisalize/common"
2014-08-03 14:52:24 +02:00
)
var globalLock sync.RWMutex
const (
fetchSql = "SELECT data FROM blocks WHERE pos = ?"
existsSql = "SELECT 1 FROM blocks WHERE pos = ?"
updateSql = "UPDATE blocks SET data = ? WHERE pos = ?"
insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)"
countSql = "SELECT count(*) FROM blocks"
keysSql = "SELECT pos FROM blocks"
2014-08-03 14:52:24 +02:00
)
type SqliteBackend struct {
db *sql.DB
encoder common.KeyEncoder
decoder common.KeyDecoder
2014-08-03 14:52:24 +02:00
existsStmt *sql.Stmt
fetchStmt *sql.Stmt
insertStmt *sql.Stmt
updateStmt *sql.Stmt
countStmt *sql.Stmt
keysStmt *sql.Stmt
2014-08-03 14:52:24 +02:00
}
type SqliteSession struct {
backend *SqliteBackend
tx *sql.Tx
}
func (ss *SqliteBackend) NewSession() (Session, error) {
return &SqliteSession{ss, nil}, nil
}
func (ss *SqliteSession) Close() error {
t := ss.tx
if t != nil {
ss.tx = nil
return t.Rollback()
}
return nil
}
func NewSqliteBackend(
path string,
encoder common.KeyEncoder,
decoder common.KeyDecoder) (sqlb *SqliteBackend, err error) {
2014-08-03 14:52:24 +02:00
res := SqliteBackend{}
if res.db, err = sql.Open("sqlite3", path); err != nil {
return
}
if res.existsStmt, err = res.db.Prepare(existsSql); err != nil {
res.closeAll()
return
}
if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil {
res.closeAll()
return
}
if res.insertStmt, err = res.db.Prepare(insertSql); err != nil {
res.closeAll()
return
}
if res.updateStmt, err = res.db.Prepare(updateSql); err != nil {
res.closeAll()
return
}
if res.countStmt, err = res.db.Prepare(countSql); err != nil {
res.closeAll()
return
}
if res.keysStmt, err = res.db.Prepare(keysSql); err != nil {
res.closeAll()
return
}
res.encoder = encoder
res.decoder = decoder
2014-08-03 14:52:24 +02:00
sqlb = &res
return
}
func closeStmt(stmt **sql.Stmt) error {
s := *stmt
if s != nil {
*stmt = nil
return s.Close()
}
return nil
}
func closeDB(db **sql.DB) error {
d := *db
if d != nil {
*db = nil
return d.Close()
}
return nil
}
func (sqlb *SqliteBackend) closeAll() error {
closeStmt(&sqlb.fetchStmt)
closeStmt(&sqlb.insertStmt)
closeStmt(&sqlb.updateStmt)
closeStmt(&sqlb.existsStmt)
closeStmt(&sqlb.countStmt)
closeStmt(&sqlb.keysStmt)
2014-08-03 14:52:24 +02:00
return closeDB(&sqlb.db)
}
func (sqlb *SqliteBackend) Shutdown() error {
globalLock.Lock()
defer globalLock.Unlock()
return sqlb.closeAll()
}
func (ss *SqliteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
if ss.tx != nil {
return ss.tx.Stmt(stmt)
2014-08-03 14:52:24 +02:00
}
return stmt
}
func (ss *SqliteSession) Fetch(hash, key []byte) (data []byte, err error) {
2014-08-03 14:52:24 +02:00
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
2014-08-03 14:52:24 +02:00
return
}
globalLock.RLock()
defer globalLock.RUnlock()
fetchStmt := ss.txStmt(ss.backend.fetchStmt)
2014-08-03 14:52:24 +02:00
err2 := fetchStmt.QueryRow(pos).Scan(&data)
if err2 == sql.ErrNoRows {
return
}
err = err2
return
}
func (ss *SqliteSession) InTransaction() bool {
return ss.tx != nil
2014-08-03 14:52:24 +02:00
}
func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error) {
2014-08-03 14:52:24 +02:00
var pos int64
if pos, err = ss.backend.decoder(key); err != nil {
2014-08-03 14:52:24 +02:00
return
}
globalLock.Lock()
defer globalLock.Unlock()
existsStmt := ss.txStmt(ss.backend.existsStmt)
2014-08-03 14:52:24 +02:00
var x int
err2 := existsStmt.QueryRow(pos).Scan(&x)
if err2 == sql.ErrNoRows {
exists = false
} else if err2 != nil {
err = err2
return
} else {
exists = true
}
if exists {
updateStmt := ss.txStmt(ss.backend.updateStmt)
2014-08-03 14:52:24 +02:00
_, err = updateStmt.Exec(value, pos)
} else {
insertStmt := ss.txStmt(ss.backend.insertStmt)
2014-08-03 14:52:24 +02:00
_, err = insertStmt.Exec(pos, value)
}
return
}
func (ss *SqliteSession) BeginTransaction() (err error) {
if ss.tx != nil {
2014-08-03 14:52:24 +02:00
log.Println("WARN: Already running transaction.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx, err = ss.backend.db.Begin()
2014-08-03 14:52:24 +02:00
return
}
func (ss *SqliteSession) CommitTransaction() error {
2014-08-03 14:52:24 +02:00
tx := ss.tx
2014-08-03 14:52:24 +02:00
if tx == nil {
log.Println("WARN: No transaction running.")
return nil
}
globalLock.Lock()
defer globalLock.Unlock()
ss.tx = nil
2014-08-03 14:52:24 +02:00
return tx.Commit()
}
func (ss *SqliteSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) {
globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt)
if err = countStmt.QueryRow().Scan(&n); err != nil {
if err == sql.ErrNoRows {
err = nil
}
globalLock.RUnlock()
return
}
keysStmt := ss.txStmt(ss.backend.keysStmt)
var rows *sql.Rows
if rows, err = keysStmt.Query(); err != nil {
globalLock.RUnlock()
return
}
keys = make(chan []byte)
go func() {
defer globalLock.RUnlock()
defer rows.Close()
defer close(keys)
var err error
for rows.Next() {
var key int64
if err := rows.Scan(&key); err != nil {
log.Printf("WARN: %s", err)
break
}
var encoded []byte
if encoded, err = ss.backend.encoder(key); err != nil {
log.Printf("Cannot encode key: %d %s\n", key, err)
break
}
keys <- encoded
}
}()
return
}