diff --git a/cmd/mtredisalize/backend.go b/cmd/mtredisalize/backend.go index efc076e..2fec6c3 100644 --- a/cmd/mtredisalize/backend.go +++ b/cmd/mtredisalize/backend.go @@ -5,41 +5,41 @@ package main type ( - // Block is the essential transfer unit from to the database. + // block is the essential transfer unit from to the database. // Key is the serialized spatial position. // Data is the serialized from of the corresponding block data. - Block struct { + block struct { Key []byte Data []byte } - // Session is a database session. - Session interface { - // Del deletes a block by a given key. - Del(hash, key []byte) (bool, error) - // Fetch fetches the block data for a given position. - Fetch(hash, key []byte) ([]byte, error) - // InTransaction returns true if a transaction is running. - InTransaction() bool - // Store stores a block with a given position and data. - Store(hash, key, value []byte) (bool, error) - // AllKeys returns all keys in the database. - AllKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error) - // SpatialQuery performs a box query between the positions first and second. - SpatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan Block, error) - // BeginTransaction starts a transcation. - BeginTransaction() error - // CommitTransaction finishes a transaction. - CommitTransaction() error - // Close closes the database session. - Close() error + // session is a database session. + session interface { + // del deletes a block by a given key. + del(hash, key []byte) (bool, error) + // fetch fetches the block data for a given position. + fetch(hash, key []byte) ([]byte, error) + // inTransaction returns true if a transaction is running. + inTransaction() bool + // store stores a block with a given position and data. + store(hash, key, value []byte) (bool, error) + // allKeys returns all keys in the database. + allKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error) + // spatialQuery performs a box query between the positions first and second. + spatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan block, error) + // beginTransaction starts a transcation. + beginTransaction() error + // commitTransaction finishes a transaction. + commitTransaction() error + // close closes the database session. + close() error } - // Backend is the interface representing a database. - Backend interface { - // NewSession opens a new session. - NewSession() (Session, error) - // Shutdown shuts down the database server. - Shutdown() error + // backend is the interface representing a database. + backend interface { + // newSession opens a new session. + newSession() (session, error) + // shutdown shuts down the database server. + shutdown() error } ) diff --git a/cmd/mtredisalize/connection.go b/cmd/mtredisalize/connection.go index 4d10546..f75ee60 100644 --- a/cmd/mtredisalize/connection.go +++ b/cmd/mtredisalize/connection.go @@ -25,22 +25,22 @@ var ( type connection struct { conn net.Conn - session Session + session session maxBulkStringSize int64 boolArray []bool } -func newConnection(conn net.Conn, session Session, maxBulkStringSize int64) *connection { +func newConnection(conn net.Conn, sess session, maxBulkStringSize int64) *connection { return &connection{ conn: conn, - session: session, + session: sess, maxBulkStringSize: maxBulkStringSize, boolArray: []bool{}} } func (c *connection) run() { defer func() { - c.session.Close() + c.session.close() c.conn.Close() }() r := bufio.NewReaderSize(c.conn, 8*1024) @@ -59,7 +59,7 @@ func logError(err error) bool { func (c *connection) Hdel(hash, key []byte) bool { - success, err := c.session.Del(hash, key) + success, err := c.session.del(hash, key) if err != nil { return c.writeError(err) } @@ -71,7 +71,7 @@ func (c *connection) Hget(hash, key []byte) bool { var err error var data []byte - if data, err = c.session.Fetch(hash, key); err != nil { + if data, err = c.session.fetch(hash, key); err != nil { return c.writeError(err) } @@ -82,11 +82,11 @@ func (c *connection) Hset(hash, key, data []byte) bool { var err error var exists bool - if exists, err = c.session.Store(hash, key, data); err != nil { + if exists, err = c.session.store(hash, key, data); err != nil { return c.writeError(err) } - if c.session.InTransaction() { + if c.session.inTransaction() { c.boolArray = append(c.boolArray, exists) return c.writeQueued() } @@ -95,10 +95,10 @@ func (c *connection) Hset(hash, key, data []byte) bool { } func (c *connection) Multi() bool { - if c.session.InTransaction() { + if c.session.inTransaction() { log.Println("WARN: Already running transaction.") } else { - if err := c.session.BeginTransaction(); err != nil { + if err := c.session.beginTransaction(); err != nil { return c.writeError(err) } } @@ -106,12 +106,12 @@ func (c *connection) Multi() bool { } func (c *connection) Exec() bool { - if !c.session.InTransaction() { + if !c.session.inTransaction() { return c.writeEmptyArray() } arr := c.boolArray c.boolArray = []bool{} - if err := c.session.CommitTransaction(); err != nil { + if err := c.session.commitTransaction(); err != nil { return c.writeError(err) } return c.writeBoolArray(arr) @@ -126,7 +126,7 @@ func (c *connection) Hkeys(hash []byte) bool { ) defer close(done) - if keys, n, err = c.session.AllKeys(hash, done); err != nil { + if keys, n, err = c.session.allKeys(hash, done); err != nil { return c.writeError(err) } @@ -153,12 +153,12 @@ func (c *connection) Ping() bool { func (c *connection) HSpatial(hash, first, second []byte) bool { var ( err error - blocks <-chan Block + blocks <-chan block done = make(chan struct{}) ) defer close(done) - if blocks, err = c.session.SpatialQuery(hash, first, second, done); err != nil { + if blocks, err = c.session.spatialQuery(hash, first, second, done); err != nil { return c.writeError(err) } diff --git a/cmd/mtredisalize/leveldb.go b/cmd/mtredisalize/leveldb.go index 2214ec0..2bec234 100644 --- a/cmd/mtredisalize/leveldb.go +++ b/cmd/mtredisalize/leveldb.go @@ -75,7 +75,7 @@ func NewLeveDBBackend( if !interleaved { if err = ldb.buildCoverage(); err != nil { - ldb.Shutdown() + ldb.shutdown() ldb = nil return } @@ -108,18 +108,18 @@ func (ldb *LevelDBBackend) buildCoverage() error { return nil } -func (ldb *LevelDBBackend) NewSession() (Session, error) { +func (ldb *LevelDBBackend) newSession() (session, error) { return &LevelDBSession{ldb, nil}, nil } -func (ldbs *LevelDBSession) Close() error { +func (ldbs *LevelDBSession) close() error { if ldbs.tx != nil { ldbs.tx.Close() } return nil } -func (ldb *LevelDBBackend) Shutdown() error { +func (ldb *LevelDBBackend) shutdown() error { ldb.db.Close() if ldb.cache != nil { ldb.cache.Close() @@ -127,7 +127,7 @@ func (ldb *LevelDBBackend) Shutdown() error { return nil } -func (ldbs *LevelDBSession) Del(_, key []byte) (success bool, err error) { +func (ldbs *LevelDBSession) del(_, key []byte) (success bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } @@ -149,7 +149,7 @@ func (ldbs *LevelDBSession) Del(_, key []byte) (success bool, err error) { return } -func (ldbs *LevelDBSession) Fetch(_, key []byte) (value []byte, err error) { +func (ldbs *LevelDBSession) fetch(_, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } @@ -164,7 +164,7 @@ func (ldbs *LevelDBSession) Fetch(_, key []byte) (value []byte, err error) { return } -func (ldbs *LevelDBSession) InTransaction() bool { +func (ldbs *LevelDBSession) inTransaction() bool { return ldbs.tx != nil } @@ -179,7 +179,7 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { return } -func (ldbs *LevelDBSession) Store(_, key, value []byte) (exists bool, err error) { +func (ldbs *LevelDBSession) store(_, key, value []byte) (exists bool, err error) { origKey := key if key, err = ldbs.backend.decoder(key); err != nil { return @@ -214,12 +214,12 @@ func (ldbs *LevelDBSession) Store(_, key, value []byte) (exists bool, err error) return } -func (ldbs *LevelDBSession) BeginTransaction() error { +func (ldbs *LevelDBSession) beginTransaction() error { ldbs.tx = leveldb.NewWriteBatch() return nil } -func (ldbs *LevelDBSession) CommitTransaction() (err error) { +func (ldbs *LevelDBSession) commitTransaction() (err error) { tx := ldbs.tx if tx == nil { log.Println("WARN: No transaction running.") @@ -234,7 +234,7 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) { return } -func (ldbs *LevelDBSession) AllKeys( +func (ldbs *LevelDBSession) allKeys( _ []byte, done <-chan struct{}) (<-chan []byte, int, error) { @@ -282,9 +282,9 @@ func (ldbs *LevelDBSession) AllKeys( return keys, n, nil } -func (ldbs *LevelDBSession) SpatialQuery( +func (ldbs *LevelDBSession) spatialQuery( _, first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { if ldbs.backend.interleaved { return ldbs.interleavedSpatialQuery(first, second, done) @@ -294,7 +294,7 @@ func (ldbs *LevelDBSession) SpatialQuery( func (ldbs *LevelDBSession) plainSpatialQuery( first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { var ( firstKey int64 @@ -311,7 +311,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery( c2 := common.PlainToCoord(secondKey) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) - blocks := make(chan Block) + blocks := make(chan block) go func() { defer close(blocks) @@ -339,7 +339,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery( } if value != nil { select { - case blocks <- Block{Key: key, Data: value}: + case blocks <- block{Key: key, Data: value}: case <-done: return } @@ -354,7 +354,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery( func (ldbs *LevelDBSession) interleavedSpatialQuery( first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { var ( firstKey int64 @@ -371,7 +371,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( c2 := common.ClipCoord(common.PlainToCoord(secondKey)) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) - blocks := make(chan Block) + blocks := make(chan block) go func() { defer close(blocks) @@ -407,7 +407,7 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery( return } select { - case blocks <- Block{Key: encodedKey, Data: it.Value()}: + case blocks <- block{Key: encodedKey, Data: it.Value()}: case <-done: return } diff --git a/cmd/mtredisalize/main.go b/cmd/mtredisalize/main.go index 1a750ed..4e9023e 100644 --- a/cmd/mtredisalize/main.go +++ b/cmd/mtredisalize/main.go @@ -75,7 +75,7 @@ func main() { var ( err error - backend Backend + backend backend gcDur time.Duration chDur time.Duration changeTracker *changeTracker @@ -115,7 +115,7 @@ func main() { } } - defer backend.Shutdown() + defer backend.shutdown() var listener net.Listener @@ -155,8 +155,8 @@ func main() { for { select { case conn := <-connChan: - var session Session - if session, err = backend.NewSession(); err != nil { + var session session + if session, err = backend.newSession(); err != nil { log.Printf("Cannot create session: %s\n", err) conn.Close() } else { diff --git a/cmd/mtredisalize/sqlite.go b/cmd/mtredisalize/sqlite.go index 2b2d356..f420b4d 100644 --- a/cmd/mtredisalize/sqlite.go +++ b/cmd/mtredisalize/sqlite.go @@ -49,11 +49,11 @@ type SQLiteSession struct { tx *sql.Tx } -func (sqlb *SQLiteBackend) NewSession() (Session, error) { +func (sqlb *SQLiteBackend) newSession() (session, error) { return &SQLiteSession{sqlb, nil}, nil } -func (ss *SQLiteSession) Close() error { +func (ss *SQLiteSession) close() error { t := ss.tx if t != nil { ss.tx = nil @@ -182,7 +182,7 @@ func (sqlb *SQLiteBackend) closeAll() error { return closeDB(&sqlb.db) } -func (sqlb *SQLiteBackend) Shutdown() error { +func (sqlb *SQLiteBackend) shutdown() error { globalLock.Lock() defer globalLock.Unlock() @@ -196,7 +196,7 @@ func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt { return stmt } -func (ss *SQLiteSession) Del(_, key []byte) (success bool, err error) { +func (ss *SQLiteSession) del(_, key []byte) (success bool, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return @@ -226,7 +226,7 @@ func (ss *SQLiteSession) Del(_, key []byte) (success bool, err error) { return } -func (ss *SQLiteSession) Fetch(_, key []byte) (data []byte, err error) { +func (ss *SQLiteSession) fetch(_, key []byte) (data []byte, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return @@ -244,11 +244,11 @@ func (ss *SQLiteSession) Fetch(_, key []byte) (data []byte, err error) { return } -func (ss *SQLiteSession) InTransaction() bool { +func (ss *SQLiteSession) inTransaction() bool { return ss.tx != nil } -func (ss *SQLiteSession) Store(_, key, value []byte) (exists bool, err error) { +func (ss *SQLiteSession) store(_, key, value []byte) (exists bool, err error) { var pos int64 if pos, err = ss.backend.decoder(key); err != nil { return @@ -295,7 +295,7 @@ func (ss *SQLiteSession) Store(_, key, value []byte) (exists bool, err error) { return } -func (ss *SQLiteSession) BeginTransaction() (err error) { +func (ss *SQLiteSession) beginTransaction() (err error) { if ss.tx != nil { log.Println("WARN: Already running transaction.") return nil @@ -307,7 +307,7 @@ func (ss *SQLiteSession) BeginTransaction() (err error) { return } -func (ss *SQLiteSession) CommitTransaction() error { +func (ss *SQLiteSession) commitTransaction() error { tx := ss.tx if tx == nil { @@ -321,7 +321,7 @@ func (ss *SQLiteSession) CommitTransaction() error { return tx.Commit() } -func (ss *SQLiteSession) AllKeys( +func (ss *SQLiteSession) allKeys( _ []byte, done <-chan struct{}) (<-chan []byte, int, error) { globalLock.RLock() @@ -372,9 +372,9 @@ func (ss *SQLiteSession) AllKeys( return keys, n, nil } -func (ss *SQLiteSession) SpatialQuery( +func (ss *SQLiteSession) spatialQuery( _, first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { if ss.backend.interleaved { return ss.interleavedSpatialQuery(first, second, done) @@ -385,7 +385,7 @@ func (ss *SQLiteSession) SpatialQuery( func (ss *SQLiteSession) interleavedSpatialQuery( first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { var ( firstKey int64 @@ -402,7 +402,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery( c2 := common.ClipCoord(common.PlainToCoord(secondKey)) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) - blocks := make(chan Block) + blocks := make(chan block) globalLock.RLock() @@ -437,7 +437,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery( key := common.StringToBytes(common.CoordToPlain(c)) //fmt.Printf("sending: %q\n", c) select { - case blocks <- Block{Key: key, Data: data}: + case blocks <- block{Key: key, Data: data}: case <-done: return } @@ -465,7 +465,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery( func (ss *SQLiteSession) plainSpatialQuery( first, second []byte, - done <-chan struct{}) (<-chan Block, error) { + done <-chan struct{}) (<-chan block, error) { var ( firstKey int64 @@ -482,7 +482,7 @@ func (ss *SQLiteSession) plainSpatialQuery( c2 := common.PlainToCoord(secondKey) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) - blocks := make(chan Block) + blocks := make(chan block) globalLock.RLock() @@ -511,7 +511,7 @@ func (ss *SQLiteSession) plainSpatialQuery( return false } select { - case blocks <- Block{Key: encodedKey, Data: data}: + case blocks <- block{Key: encodedKey, Data: data}: case <-done: return false }