// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "database/sql" "log" "sync" _ "github.com/mattn/go-sqlite3" "bitbucket.org/s_l_teichmann/mtredisalize/common" ) var globalLock sync.RWMutex const ( fetchSql = "SELECT data FROM blocks WHERE pos = ?" existsSql = "SELECT 1 FROM blocks WHERE pos = ?" updateSql = "UPDATE blocks SET data = ? WHERE pos = ?" insertSql = "INSERT INTO blocks (pos, data) VALUES (?, ?)" countSql = "SELECT count(*) FROM blocks" keysSql = "SELECT pos FROM blocks" rangeSql = "SELECT pos, data FROM blocks WHERE pos BETWEEN ? AND ? ORDER BY pos" ) type SqliteBackend struct { db *sql.DB encoder common.KeyEncoder decoder common.KeyDecoder interleaved bool existsStmt *sql.Stmt fetchStmt *sql.Stmt insertStmt *sql.Stmt updateStmt *sql.Stmt countStmt *sql.Stmt keysStmt *sql.Stmt rangeStmt *sql.Stmt } type SqliteSession struct { backend *SqliteBackend tx *sql.Tx } func (ss *SqliteBackend) NewSession() (Session, error) { return &SqliteSession{ss, nil}, nil } func (ss *SqliteSession) Close() error { t := ss.tx if t != nil { ss.tx = nil return t.Rollback() } return nil } func NewSqliteBackend(path string, interleaved bool) (sqlb *SqliteBackend, err error) { res := SqliteBackend{interleaved: interleaved} if res.db, err = sql.Open("sqlite3", path); err != nil { return } if res.existsStmt, err = res.db.Prepare(existsSql); err != nil { res.closeAll() return } if res.fetchStmt, err = res.db.Prepare(fetchSql); err != nil { res.closeAll() return } if res.insertStmt, err = res.db.Prepare(insertSql); err != nil { res.closeAll() return } if res.updateStmt, err = res.db.Prepare(updateSql); err != nil { res.closeAll() return } if res.countStmt, err = res.db.Prepare(countSql); err != nil { res.closeAll() return } if res.keysStmt, err = res.db.Prepare(keysSql); err != nil { res.closeAll() return } if res.rangeStmt, err = res.db.Prepare(rangeSql); err != nil { res.closeAll() return } if interleaved { res.encoder = common.EncodeStringToBytesFromInterleaved res.decoder = common.DecodeStringFromBytesToInterleaved } else { res.encoder = common.EncodeStringToBytes res.decoder = common.DecodeStringFromBytes } sqlb = &res return } func closeStmt(stmt **sql.Stmt) error { s := *stmt if s != nil { *stmt = nil return s.Close() } return nil } func closeDB(db **sql.DB) error { d := *db if d != nil { *db = nil return d.Close() } return nil } func (sqlb *SqliteBackend) closeAll() error { closeStmt(&sqlb.fetchStmt) closeStmt(&sqlb.insertStmt) closeStmt(&sqlb.updateStmt) closeStmt(&sqlb.existsStmt) closeStmt(&sqlb.countStmt) closeStmt(&sqlb.keysStmt) closeStmt(&sqlb.rangeStmt) return closeDB(&sqlb.db) } func (sqlb *SqliteBackend) Shutdown() error { globalLock.Lock() defer globalLock.Unlock() return sqlb.closeAll() } func (ss *SqliteSession) txStmt(stmt *sql.Stmt) *sql.Stmt { if ss.tx != nil { return ss.tx.Stmt(stmt) } return stmt } func (ss *SqliteSession) Fetch(hash, key []byte) (data []byte, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return } globalLock.RLock() defer globalLock.RUnlock() fetchStmt := ss.txStmt(ss.backend.fetchStmt) err2 := fetchStmt.QueryRow(pos).Scan(&data) if err2 == sql.ErrNoRows { return } err = err2 return } func (ss *SqliteSession) InTransaction() bool { return ss.tx != nil } func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return } globalLock.Lock() defer globalLock.Unlock() existsStmt := ss.txStmt(ss.backend.existsStmt) var x int err2 := existsStmt.QueryRow(pos).Scan(&x) if err2 == sql.ErrNoRows { exists = false } else if err2 != nil { err = err2 return } else { exists = true } if exists { updateStmt := ss.txStmt(ss.backend.updateStmt) _, err = updateStmt.Exec(value, pos) } else { insertStmt := ss.txStmt(ss.backend.insertStmt) _, err = insertStmt.Exec(pos, value) } return } func (ss *SqliteSession) BeginTransaction() (err error) { if ss.tx != nil { log.Println("WARN: Already running transaction.") return nil } globalLock.Lock() defer globalLock.Unlock() ss.tx, err = ss.backend.db.Begin() return } func (ss *SqliteSession) CommitTransaction() error { tx := ss.tx if tx == nil { log.Println("WARN: No transaction running.") return nil } globalLock.Lock() defer globalLock.Unlock() ss.tx = nil return tx.Commit() } func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { globalLock.RLock() countStmt := ss.txStmt(ss.backend.countStmt) if err = countStmt.QueryRow().Scan(&n); err != nil { if err == sql.ErrNoRows { err = nil } globalLock.RUnlock() return } keysStmt := ss.txStmt(ss.backend.keysStmt) var rows *sql.Rows if rows, err = keysStmt.Query(); err != nil { globalLock.RUnlock() return } keys = make(chan []byte) go func() { defer globalLock.RUnlock() defer rows.Close() defer close(keys) var err error for rows.Next() { var key int64 if err := rows.Scan(&key); err != nil { log.Printf("WARN: %s", err) break } var encoded []byte if encoded, err = ss.backend.encoder(key); err != nil { log.Printf("Cannot encode key: %d %s\n", key, err) break } select { case keys <- encoded: case <-done: return } } }() return } func (ss *SqliteSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) { if ss.backend.interleaved { return ss.interleavedSpatialQuery(first, second, done) } return ss.plainSpatialQuery(first, second, done) } func (ss *SqliteSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { var ( firstKey int64 secondKey int64 ) if firstKey, err = common.DecodeStringFromBytes(first); err != nil { return } if secondKey, err = common.DecodeStringFromBytes(second); err != nil { return } c1 := common.ClipCoord(common.PlainToCoord(firstKey)) c2 := common.ClipCoord(common.PlainToCoord(secondKey)) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) blocks = make(chan Block) globalLock.RLock() go func() { defer close(blocks) defer globalLock.RUnlock() zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2) // Should not be necessary. zmin, zmax = order(zmin, zmax) cub := common.Cuboid{P1: c1, P2: c2} rangeStmt := ss.txStmt(ss.backend.rangeStmt) OUTER: for { var ( err error rows *sql.Rows ) if rows, err = rangeStmt.Query(zmin, zmax); err != nil { log.Printf("Error in range query: %s", err) return } for rows.Next() { var zcode int64 var data []byte if err = rows.Scan(&zcode, &data); err != nil { log.Printf("Error in scanning row: %s", err) rows.Close() return } c := common.InterleavedToCoord(zcode) if cub.Contains(c) { var encodedKey []byte if encodedKey, err = common.EncodeStringToBytes(common.CoordToPlain(c)); err != nil { log.Printf("Key encoding failed: %s", err) rows.Close() return } select { case blocks <- Block{Key: encodedKey, Data: data}: case <-done: rows.Close() return } } else { // Left the cuboid rows.Close() zmin = common.BigMin(zmin, zmax, zcode) continue OUTER } } if err = rows.Err(); err != nil { log.Printf("Error in range query: %s", err) } rows.Close() } }() return } func (ss *SqliteSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { var ( firstKey int64 secondKey int64 ) if firstKey, err = common.DecodeStringFromBytes(first); err != nil { return } if secondKey, err = common.DecodeStringFromBytes(second); err != nil { return } c1 := common.PlainToCoord(firstKey) c2 := common.PlainToCoord(secondKey) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) blocks = make(chan Block) globalLock.RLock() go func() { defer globalLock.RUnlock() defer close(blocks) rangeStmt := ss.txStmt(ss.backend.rangeStmt) a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X} for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ { b.Z = a.Z for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ { b.Y = a.Y var ( err error rows *sql.Rows ) // Ordering should not be necessary. from, to := order(common.CoordToPlain(a), common.CoordToPlain(b)) if rows, err = rangeStmt.Query(from, to); err != nil { log.Printf("Error in range query: %s", err) return } for rows.Next() { var key int64 var data []byte if err = rows.Scan(&key, &data); err != nil { log.Printf("Error in scanning row: %s", err) break } var encodedKey []byte if encodedKey, err = common.EncodeStringToBytes(key); err != nil { log.Printf("Key encoding failed: %s", err) break } select { case blocks <- Block{Key: encodedKey, Data: data}: case <-done: rows.Close() return } } if err = rows.Err(); err != nil { log.Printf("Error in range query: %s", err) } rows.Close() } } }() return }