unexport interfaces in mtredisalize

This commit is contained in:
Sascha L. Teichmann 2024-01-07 10:54:21 +01:00
parent 447b8fc915
commit 75cc24c04a
5 changed files with 84 additions and 84 deletions

View File

@ -5,41 +5,41 @@
package main package main
type ( type (
// Block is the essential transfer unit from to the database. // block is the essential transfer unit from to the database.
// Key is the serialized spatial position. // Key is the serialized spatial position.
// Data is the serialized from of the corresponding block data. // Data is the serialized from of the corresponding block data.
Block struct { block struct {
Key []byte Key []byte
Data []byte Data []byte
} }
// Session is a database session. // session is a database session.
Session interface { session interface {
// Del deletes a block by a given key. // del deletes a block by a given key.
Del(hash, key []byte) (bool, error) del(hash, key []byte) (bool, error)
// Fetch fetches the block data for a given position. // fetch fetches the block data for a given position.
Fetch(hash, key []byte) ([]byte, error) fetch(hash, key []byte) ([]byte, error)
// InTransaction returns true if a transaction is running. // inTransaction returns true if a transaction is running.
InTransaction() bool inTransaction() bool
// Store stores a block with a given position and data. // store stores a block with a given position and data.
Store(hash, key, value []byte) (bool, error) store(hash, key, value []byte) (bool, error)
// AllKeys returns all keys in the database. // allKeys returns all keys in the database.
AllKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error) allKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error)
// SpatialQuery performs a box query between the positions first and second. // spatialQuery performs a box query between the positions first and second.
SpatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan Block, error) spatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan block, error)
// BeginTransaction starts a transcation. // beginTransaction starts a transcation.
BeginTransaction() error beginTransaction() error
// CommitTransaction finishes a transaction. // commitTransaction finishes a transaction.
CommitTransaction() error commitTransaction() error
// Close closes the database session. // close closes the database session.
Close() error close() error
} }
// Backend is the interface representing a database. // backend is the interface representing a database.
Backend interface { backend interface {
// NewSession opens a new session. // newSession opens a new session.
NewSession() (Session, error) newSession() (session, error)
// Shutdown shuts down the database server. // shutdown shuts down the database server.
Shutdown() error shutdown() error
} }
) )

View File

@ -25,22 +25,22 @@ var (
type connection struct { type connection struct {
conn net.Conn conn net.Conn
session Session session session
maxBulkStringSize int64 maxBulkStringSize int64
boolArray []bool boolArray []bool
} }
func newConnection(conn net.Conn, session Session, maxBulkStringSize int64) *connection { func newConnection(conn net.Conn, sess session, maxBulkStringSize int64) *connection {
return &connection{ return &connection{
conn: conn, conn: conn,
session: session, session: sess,
maxBulkStringSize: maxBulkStringSize, maxBulkStringSize: maxBulkStringSize,
boolArray: []bool{}} boolArray: []bool{}}
} }
func (c *connection) run() { func (c *connection) run() {
defer func() { defer func() {
c.session.Close() c.session.close()
c.conn.Close() c.conn.Close()
}() }()
r := bufio.NewReaderSize(c.conn, 8*1024) r := bufio.NewReaderSize(c.conn, 8*1024)
@ -59,7 +59,7 @@ func logError(err error) bool {
func (c *connection) Hdel(hash, key []byte) bool { func (c *connection) Hdel(hash, key []byte) bool {
success, err := c.session.Del(hash, key) success, err := c.session.del(hash, key)
if err != nil { if err != nil {
return c.writeError(err) return c.writeError(err)
} }
@ -71,7 +71,7 @@ func (c *connection) Hget(hash, key []byte) bool {
var err error var err error
var data []byte var data []byte
if data, err = c.session.Fetch(hash, key); err != nil { if data, err = c.session.fetch(hash, key); err != nil {
return c.writeError(err) return c.writeError(err)
} }
@ -82,11 +82,11 @@ func (c *connection) Hset(hash, key, data []byte) bool {
var err error var err error
var exists bool var exists bool
if exists, err = c.session.Store(hash, key, data); err != nil { if exists, err = c.session.store(hash, key, data); err != nil {
return c.writeError(err) return c.writeError(err)
} }
if c.session.InTransaction() { if c.session.inTransaction() {
c.boolArray = append(c.boolArray, exists) c.boolArray = append(c.boolArray, exists)
return c.writeQueued() return c.writeQueued()
} }
@ -95,10 +95,10 @@ func (c *connection) Hset(hash, key, data []byte) bool {
} }
func (c *connection) Multi() bool { func (c *connection) Multi() bool {
if c.session.InTransaction() { if c.session.inTransaction() {
log.Println("WARN: Already running transaction.") log.Println("WARN: Already running transaction.")
} else { } else {
if err := c.session.BeginTransaction(); err != nil { if err := c.session.beginTransaction(); err != nil {
return c.writeError(err) return c.writeError(err)
} }
} }
@ -106,12 +106,12 @@ func (c *connection) Multi() bool {
} }
func (c *connection) Exec() bool { func (c *connection) Exec() bool {
if !c.session.InTransaction() { if !c.session.inTransaction() {
return c.writeEmptyArray() return c.writeEmptyArray()
} }
arr := c.boolArray arr := c.boolArray
c.boolArray = []bool{} c.boolArray = []bool{}
if err := c.session.CommitTransaction(); err != nil { if err := c.session.commitTransaction(); err != nil {
return c.writeError(err) return c.writeError(err)
} }
return c.writeBoolArray(arr) return c.writeBoolArray(arr)
@ -126,7 +126,7 @@ func (c *connection) Hkeys(hash []byte) bool {
) )
defer close(done) defer close(done)
if keys, n, err = c.session.AllKeys(hash, done); err != nil { if keys, n, err = c.session.allKeys(hash, done); err != nil {
return c.writeError(err) return c.writeError(err)
} }
@ -153,12 +153,12 @@ func (c *connection) Ping() bool {
func (c *connection) HSpatial(hash, first, second []byte) bool { func (c *connection) HSpatial(hash, first, second []byte) bool {
var ( var (
err error err error
blocks <-chan Block blocks <-chan block
done = make(chan struct{}) done = make(chan struct{})
) )
defer close(done) defer close(done)
if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil { if blocks, err = c.session.spatialQuery(hash, first, second, done); err != nil {
return c.writeError(err) return c.writeError(err)
} }

View File

@ -75,7 +75,7 @@ func NewLeveDBBackend(
if !interleaved { if !interleaved {
if err = ldb.buildCoverage(); err != nil { if err = ldb.buildCoverage(); err != nil {
ldb.Shutdown() ldb.shutdown()
ldb = nil ldb = nil
return return
} }
@ -108,18 +108,18 @@ func (ldb *LevelDBBackend) buildCoverage() error {
return nil return nil
} }
func (ldb *LevelDBBackend) NewSession() (Session, error) { func (ldb *LevelDBBackend) newSession() (session, error) {
return &LevelDBSession{ldb, nil}, nil return &LevelDBSession{ldb, nil}, nil
} }
func (ldbs *LevelDBSession) Close() error { func (ldbs *LevelDBSession) close() error {
if ldbs.tx != nil { if ldbs.tx != nil {
ldbs.tx.Close() ldbs.tx.Close()
} }
return nil return nil
} }
func (ldb *LevelDBBackend) Shutdown() error { func (ldb *LevelDBBackend) shutdown() error {
ldb.db.Close() ldb.db.Close()
if ldb.cache != nil { if ldb.cache != nil {
ldb.cache.Close() ldb.cache.Close()
@ -127,7 +127,7 @@ func (ldb *LevelDBBackend) Shutdown() error {
return nil return nil
} }
func (ldbs *LevelDBSession) Del(_, key []byte) (success bool, err error) { func (ldbs *LevelDBSession) del(_, key []byte) (success bool, err error) {
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
@ -149,7 +149,7 @@ func (ldbs *LevelDBSession) Del(_, key []byte) (success bool, err error) {
return return
} }
func (ldbs *LevelDBSession) Fetch(_, key []byte) (value []byte, err error) { func (ldbs *LevelDBSession) fetch(_, key []byte) (value []byte, err error) {
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
} }
@ -164,7 +164,7 @@ func (ldbs *LevelDBSession) Fetch(_, key []byte) (value []byte, err error) {
return return
} }
func (ldbs *LevelDBSession) InTransaction() bool { func (ldbs *LevelDBSession) inTransaction() bool {
return ldbs.tx != nil return ldbs.tx != nil
} }
@ -179,7 +179,7 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
return return
} }
func (ldbs *LevelDBSession) Store(_, key, value []byte) (exists bool, err error) { func (ldbs *LevelDBSession) store(_, key, value []byte) (exists bool, err error) {
origKey := key origKey := key
if key, err = ldbs.backend.decoder(key); err != nil { if key, err = ldbs.backend.decoder(key); err != nil {
return return
@ -214,12 +214,12 @@ func (ldbs *LevelDBSession) Store(_, key, value []byte) (exists bool, err error)
return return
} }
func (ldbs *LevelDBSession) BeginTransaction() error { func (ldbs *LevelDBSession) beginTransaction() error {
ldbs.tx = leveldb.NewWriteBatch() ldbs.tx = leveldb.NewWriteBatch()
return nil return nil
} }
func (ldbs *LevelDBSession) CommitTransaction() (err error) { func (ldbs *LevelDBSession) commitTransaction() (err error) {
tx := ldbs.tx tx := ldbs.tx
if tx == nil { if tx == nil {
log.Println("WARN: No transaction running.") log.Println("WARN: No transaction running.")
@ -234,7 +234,7 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return return
} }
func (ldbs *LevelDBSession) AllKeys( func (ldbs *LevelDBSession) allKeys(
_ []byte, _ []byte,
done <-chan struct{}) (<-chan []byte, int, error) { done <-chan struct{}) (<-chan []byte, int, error) {
@ -282,9 +282,9 @@ func (ldbs *LevelDBSession) AllKeys(
return keys, n, nil return keys, n, nil
} }
func (ldbs *LevelDBSession) SpatialQuery( func (ldbs *LevelDBSession) spatialQuery(
_, first, second []byte, _, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
if ldbs.backend.interleaved { if ldbs.backend.interleaved {
return ldbs.interleavedSpatialQuery(first, second, done) return ldbs.interleavedSpatialQuery(first, second, done)
@ -294,7 +294,7 @@ func (ldbs *LevelDBSession) SpatialQuery(
func (ldbs *LevelDBSession) plainSpatialQuery( func (ldbs *LevelDBSession) plainSpatialQuery(
first, second []byte, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
var ( var (
firstKey int64 firstKey int64
@ -311,7 +311,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
c2 := common.PlainToCoord(secondKey) c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block) blocks := make(chan block)
go func() { go func() {
defer close(blocks) defer close(blocks)
@ -339,7 +339,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
} }
if value != nil { if value != nil {
select { select {
case blocks <- Block{Key: key, Data: value}: case blocks <- block{Key: key, Data: value}:
case <-done: case <-done:
return return
} }
@ -354,7 +354,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
func (ldbs *LevelDBSession) interleavedSpatialQuery( func (ldbs *LevelDBSession) interleavedSpatialQuery(
first, second []byte, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
var ( var (
firstKey int64 firstKey int64
@ -371,7 +371,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
c2 := common.ClipCoord(common.PlainToCoord(secondKey)) c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block) blocks := make(chan block)
go func() { go func() {
defer close(blocks) defer close(blocks)
@ -407,7 +407,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
return return
} }
select { select {
case blocks <- Block{Key: encodedKey, Data: it.Value()}: case blocks <- block{Key: encodedKey, Data: it.Value()}:
case <-done: case <-done:
return return
} }

View File

@ -75,7 +75,7 @@ func main() {
var ( var (
err error err error
backend Backend backend backend
gcDur time.Duration gcDur time.Duration
chDur time.Duration chDur time.Duration
changeTracker *changeTracker changeTracker *changeTracker
@ -115,7 +115,7 @@ func main() {
} }
} }
defer backend.Shutdown() defer backend.shutdown()
var listener net.Listener var listener net.Listener
@ -155,8 +155,8 @@ func main() {
for { for {
select { select {
case conn := <-connChan: case conn := <-connChan:
var session Session var session session
if session, err = backend.NewSession(); err != nil { if session, err = backend.newSession(); err != nil {
log.Printf("Cannot create session: %s\n", err) log.Printf("Cannot create session: %s\n", err)
conn.Close() conn.Close()
} else { } else {

View File

@ -49,11 +49,11 @@ type SQLiteSession struct {
tx *sql.Tx tx *sql.Tx
} }
func (sqlb *SQLiteBackend) NewSession() (Session, error) { func (sqlb *SQLiteBackend) newSession() (session, error) {
return &SQLiteSession{sqlb, nil}, nil return &SQLiteSession{sqlb, nil}, nil
} }
func (ss *SQLiteSession) Close() error { func (ss *SQLiteSession) close() error {
t := ss.tx t := ss.tx
if t != nil { if t != nil {
ss.tx = nil ss.tx = nil
@ -182,7 +182,7 @@ func (sqlb *SQLiteBackend) closeAll() error {
return closeDB(&sqlb.db) return closeDB(&sqlb.db)
} }
func (sqlb *SQLiteBackend) Shutdown() error { func (sqlb *SQLiteBackend) shutdown() error {
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
@ -196,7 +196,7 @@ func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
return stmt return stmt
} }
func (ss *SQLiteSession) Del(_, key []byte) (success bool, err error) { func (ss *SQLiteSession) del(_, key []byte) (success bool, err error) {
var pos int64 var pos int64
if pos, err = ss.backend.decoder(key); err != nil { if pos, err = ss.backend.decoder(key); err != nil {
return return
@ -226,7 +226,7 @@ func (ss *SQLiteSession) Del(_, key []byte) (success bool, err error) {
return return
} }
func (ss *SQLiteSession) Fetch(_, key []byte) (data []byte, err error) { func (ss *SQLiteSession) fetch(_, key []byte) (data []byte, err error) {
var pos int64 var pos int64
if pos, err = ss.backend.decoder(key); err != nil { if pos, err = ss.backend.decoder(key); err != nil {
return return
@ -244,11 +244,11 @@ func (ss *SQLiteSession) Fetch(_, key []byte) (data []byte, err error) {
return return
} }
func (ss *SQLiteSession) InTransaction() bool { func (ss *SQLiteSession) inTransaction() bool {
return ss.tx != nil return ss.tx != nil
} }
func (ss *SQLiteSession) Store(_, key, value []byte) (exists bool, err error) { func (ss *SQLiteSession) store(_, key, value []byte) (exists bool, err error) {
var pos int64 var pos int64
if pos, err = ss.backend.decoder(key); err != nil { if pos, err = ss.backend.decoder(key); err != nil {
return return
@ -295,7 +295,7 @@ func (ss *SQLiteSession) Store(_, key, value []byte) (exists bool, err error) {
return return
} }
func (ss *SQLiteSession) BeginTransaction() (err error) { func (ss *SQLiteSession) beginTransaction() (err error) {
if ss.tx != nil { if ss.tx != nil {
log.Println("WARN: Already running transaction.") log.Println("WARN: Already running transaction.")
return nil return nil
@ -307,7 +307,7 @@ func (ss *SQLiteSession) BeginTransaction() (err error) {
return return
} }
func (ss *SQLiteSession) CommitTransaction() error { func (ss *SQLiteSession) commitTransaction() error {
tx := ss.tx tx := ss.tx
if tx == nil { if tx == nil {
@ -321,7 +321,7 @@ func (ss *SQLiteSession) CommitTransaction() error {
return tx.Commit() return tx.Commit()
} }
func (ss *SQLiteSession) AllKeys( func (ss *SQLiteSession) allKeys(
_ []byte, _ []byte,
done <-chan struct{}) (<-chan []byte, int, error) { done <-chan struct{}) (<-chan []byte, int, error) {
globalLock.RLock() globalLock.RLock()
@ -372,9 +372,9 @@ func (ss *SQLiteSession) AllKeys(
return keys, n, nil return keys, n, nil
} }
func (ss *SQLiteSession) SpatialQuery( func (ss *SQLiteSession) spatialQuery(
_, first, second []byte, _, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
if ss.backend.interleaved { if ss.backend.interleaved {
return ss.interleavedSpatialQuery(first, second, done) return ss.interleavedSpatialQuery(first, second, done)
@ -385,7 +385,7 @@ func (ss *SQLiteSession) SpatialQuery(
func (ss *SQLiteSession) interleavedSpatialQuery( func (ss *SQLiteSession) interleavedSpatialQuery(
first, second []byte, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
var ( var (
firstKey int64 firstKey int64
@ -402,7 +402,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery(
c2 := common.ClipCoord(common.PlainToCoord(secondKey)) c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block) blocks := make(chan block)
globalLock.RLock() globalLock.RLock()
@ -437,7 +437,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery(
key := common.StringToBytes(common.CoordToPlain(c)) key := common.StringToBytes(common.CoordToPlain(c))
//fmt.Printf("sending: %q\n", c) //fmt.Printf("sending: %q\n", c)
select { select {
case blocks <- Block{Key: key, Data: data}: case blocks <- block{Key: key, Data: data}:
case <-done: case <-done:
return return
} }
@ -465,7 +465,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery(
func (ss *SQLiteSession) plainSpatialQuery( func (ss *SQLiteSession) plainSpatialQuery(
first, second []byte, first, second []byte,
done <-chan struct{}) (<-chan Block, error) { done <-chan struct{}) (<-chan block, error) {
var ( var (
firstKey int64 firstKey int64
@ -482,7 +482,7 @@ func (ss *SQLiteSession) plainSpatialQuery(
c2 := common.PlainToCoord(secondKey) c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks := make(chan Block) blocks := make(chan block)
globalLock.RLock() globalLock.RLock()
@ -511,7 +511,7 @@ func (ss *SQLiteSession) plainSpatialQuery(
return false return false
} }
select { select {
case blocks <- Block{Key: encodedKey, Data: data}: case blocks <- block{Key: encodedKey, Data: data}:
case <-done: case <-done:
return false return false
} }