// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "database/sql" "log" "sync" _ "github.com/mattn/go-sqlite3" "bitbucket.org/s_l_teichmann/mtredisalize/common" ) var globalLock sync.RWMutex const ( fetchSql = "SELECT data FROM blocks WHERE pos = ?" existsSql = "SELECT 1 FROM blocks WHERE pos = ?" updateSql = "UPDATE blocks SET data = ? WHERE pos = ?" insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)" countSql = "SELECT count(*) FROM blocks" keysSql = "SELECT pos FROM blocks" ) type SqliteBackend struct { db *sql.DB encoder common.KeyEncoder decoder common.KeyDecoder interleaved bool existsStmt *sql.Stmt fetchStmt *sql.Stmt insertStmt *sql.Stmt updateStmt *sql.Stmt countStmt *sql.Stmt keysStmt *sql.Stmt } type SqliteSession struct { backend *SqliteBackend tx *sql.Tx } func (ss *SqliteBackend) NewSession() (Session, error) { return &SqliteSession{ss, nil}, nil } func (ss *SqliteSession) Close() error { t := ss.tx if t != nil { ss.tx = nil return t.Rollback() } return nil } func NewSqliteBackend(path string, interleaved bool) (sqlb *SqliteBackend, err error) { res := SqliteBackend{interleaved: interleaved} if res.db, err = sql.Open("sqlite3", path); err != nil { return } if res.existsStmt, err = res.db.Prepare(existsSql); err != nil { res.closeAll() return } if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil { res.closeAll() return } if res.insertStmt, err = res.db.Prepare(insertSql); err != nil { res.closeAll() return } if res.updateStmt, err = res.db.Prepare(updateSql); err != nil { res.closeAll() return } if res.countStmt, err = res.db.Prepare(countSql); err != nil { res.closeAll() return } if res.keysStmt, err = res.db.Prepare(keysSql); err != nil { res.closeAll() return } if interleaved { res.encoder = common.EncodeStringToBytesFromInterleaved res.decoder = common.DecodeStringFromBytesToInterleaved } else { res.encoder = common.EncodeStringToBytes res.decoder = common.DecodeStringFromBytes } sqlb = &res return } func closeStmt(stmt **sql.Stmt) error { s := *stmt if s != nil { *stmt = nil return s.Close() } return nil } func closeDB(db **sql.DB) error { d := *db if d != nil { *db = nil return d.Close() } return nil } func (sqlb *SqliteBackend) closeAll() error { closeStmt(&sqlb.fetchStmt) closeStmt(&sqlb.insertStmt) closeStmt(&sqlb.updateStmt) closeStmt(&sqlb.existsStmt) closeStmt(&sqlb.countStmt) closeStmt(&sqlb.keysStmt) return closeDB(&sqlb.db) } func (sqlb *SqliteBackend) Shutdown() error { globalLock.Lock() defer globalLock.Unlock() return sqlb.closeAll() } func (ss *SqliteSession) txStmt(stmt *sql.Stmt) *sql.Stmt { if ss.tx != nil { return ss.tx.Stmt(stmt) } return stmt } func (ss *SqliteSession) Fetch(hash, key []byte) (data []byte, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return } globalLock.RLock() defer globalLock.RUnlock() fetchStmt := ss.txStmt(ss.backend.fetchStmt) err2 := fetchStmt.QueryRow(pos).Scan(&data) if err2 == sql.ErrNoRows { return } err = err2 return } func (ss *SqliteSession) InTransaction() bool { return ss.tx != nil } func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return } globalLock.Lock() defer globalLock.Unlock() existsStmt := ss.txStmt(ss.backend.existsStmt) var x int err2 := existsStmt.QueryRow(pos).Scan(&x) if err2 == sql.ErrNoRows { exists = false } else if err2 != nil { err = err2 return } else { exists = true } if exists { updateStmt := ss.txStmt(ss.backend.updateStmt) _, err = updateStmt.Exec(value, pos) } else { insertStmt := ss.txStmt(ss.backend.insertStmt) _, err = insertStmt.Exec(pos, value) } return } func (ss *SqliteSession) BeginTransaction() (err error) { if ss.tx != nil { log.Println("WARN: Already running transaction.") return nil } globalLock.Lock() defer globalLock.Unlock() ss.tx, err = ss.backend.db.Begin() return } func (ss *SqliteSession) CommitTransaction() error { tx := ss.tx if tx == nil { log.Println("WARN: No transaction running.") return nil } globalLock.Lock() defer globalLock.Unlock() ss.tx = nil return tx.Commit() } func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { globalLock.RLock() countStmt := ss.txStmt(ss.backend.countStmt) if err = countStmt.QueryRow().Scan(&n); err != nil { if err == sql.ErrNoRows { err = nil } globalLock.RUnlock() return } keysStmt := ss.txStmt(ss.backend.keysStmt) var rows *sql.Rows if rows, err = keysStmt.Query(); err != nil { globalLock.RUnlock() return } keys = make(chan []byte) go func() { defer globalLock.RUnlock() defer rows.Close() defer close(keys) var err error for rows.Next() { var key int64 if err := rows.Scan(&key); err != nil { log.Printf("WARN: %s", err) break } var encoded []byte if encoded, err = ss.backend.encoder(key); err != nil { log.Printf("Cannot encode key: %d %s\n", key, err) break } select { case keys <- encoded: case <-done: return } } }() return } func (ss *SqliteSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (blocks chan Block, err error) { err = ErrNotImplemented return }