Be more explicit about data flow directions of the channels in mtredisalize.

This commit is contained in:
Sascha L. Teichmann 2016-04-24 20:47:55 +02:00
parent 4dc43881c6
commit d21fa6c027
4 changed files with 46 additions and 38 deletions

View File

@ -22,9 +22,9 @@ type (
// Store stores a block with a given position and data.
Store(hash, key, value []byte) (bool, error)
// AllKeys returns all keys in the database.
AllKeys(hash []byte, done chan struct{}) (chan []byte, int, error)
AllKeys(hash []byte, done <-chan struct{}) (<-chan []byte, int, error)
// SpatialQuery performs a box query between the positions first and second.
SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error)
SpatialQuery(hash, first, second []byte, done <-chan struct{}) (<-chan Block, error)
// BeginTransaction starts a transcation.
BeginTransaction() error
// CommitTransaction finishes a transaction.
@ -33,7 +33,7 @@ type (
Close() error
}
// Backend is interface representing a database.
// Backend is the interface representing a database.
Backend interface {
// NewSession opens a new session.
NewSession() (Session, error)

View File

@ -112,7 +112,7 @@ func (c *Connection) Hkeys(hash []byte) bool {
var (
err error
n int
keys chan []byte
keys <-chan []byte
done = make(chan struct{})
)
defer close(done)
@ -145,7 +145,7 @@ func (c *Connection) Ping() bool {
func (c *Connection) HSpatial(hash, first, second []byte) bool {
var (
err error
blocks chan Block
blocks <-chan Block
done = make(chan struct{})
)
defer close(done)

View File

@ -233,7 +233,7 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
func (ldbs *LevelDBSession) AllKeys(
hash []byte,
done chan struct{}) (keys chan []byte, n int, err error) {
done <-chan struct{}) (<-chan []byte, int, error) {
ldbs.backend.mutex.RLock()
@ -242,18 +242,19 @@ func (ldbs *LevelDBSession) AllKeys(
it := ldbs.backend.db.NewIterator(ro)
it.SeekToFirst()
var n int
for ; it.Valid(); it.Next() {
n++
}
if err = it.GetError(); err != nil {
if err := it.GetError(); err != nil {
it.Close()
ro.Close()
ldbs.backend.mutex.RUnlock()
return
return nil, n, err
}
keys = make(chan []byte)
keys := make(chan []byte)
go func() {
ldbs.backend.mutex.RUnlock()
@ -279,12 +280,12 @@ func (ldbs *LevelDBSession) AllKeys(
}
}()
return
return keys, n, nil
}
func (ldbs *LevelDBSession) SpatialQuery(
hash, first, second []byte,
done chan struct{}) (chan Block, error) {
done <-chan struct{}) (<-chan Block, error) {
if ldbs.backend.interleaved {
return ldbs.interleavedSpatialQuery(first, second, done)
@ -294,23 +295,24 @@ func (ldbs *LevelDBSession) SpatialQuery(
func (ldbs *LevelDBSession) plainSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
return nil, err
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
blocks := make(chan Block)
go func() {
defer close(blocks)
@ -350,28 +352,29 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
}
}
}()
return
return blocks, nil
}
func (ldbs *LevelDBSession) interleavedSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
return nil, err
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
blocks := make(chan Block)
go func() {
defer close(blocks)
@ -427,5 +430,5 @@ func (ldbs *LevelDBSession) interleavedSpatialQuery(
return
}
}()
return
return blocks, nil
}

View File

@ -285,26 +285,28 @@ func (ss *SqliteSession) CommitTransaction() error {
func (ss *SqliteSession) AllKeys(
hash []byte,
done chan struct{}) (keys chan []byte, n int, err error) {
done <-chan struct{}) (<-chan []byte, int, error) {
globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt)
var n int
var err error
if err = countStmt.QueryRow().Scan(&n); err != nil {
if err == sql.ErrNoRows {
err = nil
}
globalLock.RUnlock()
return
return nil, n, err
}
keysStmt := ss.txStmt(ss.backend.keysStmt)
var rows *sql.Rows
if rows, err = keysStmt.Query(); err != nil {
globalLock.RUnlock()
return
return nil, n, err
}
keys = make(chan []byte)
keys := make(chan []byte)
go func() {
defer globalLock.RUnlock()
defer rows.Close()
@ -312,7 +314,7 @@ func (ss *SqliteSession) AllKeys(
var err error
for rows.Next() {
var key int64
if err := rows.Scan(&key); err != nil {
if err = rows.Scan(&key); err != nil {
log.Printf("WARN: %s\n", err)
break
}
@ -329,12 +331,12 @@ func (ss *SqliteSession) AllKeys(
}
}()
return
return keys, n, nil
}
func (ss *SqliteSession) SpatialQuery(
hash, first, second []byte,
done chan struct{}) (chan Block, error) {
done <-chan struct{}) (<-chan Block, error) {
if ss.backend.interleaved {
return ss.interleavedSpatialQuery(first, second, done)
@ -345,22 +347,24 @@ func (ss *SqliteSession) SpatialQuery(
func (ss *SqliteSession) interleavedSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
return nil, err
}
c1 := common.ClipCoord(common.PlainToCoord(firstKey))
c2 := common.ClipCoord(common.PlainToCoord(secondKey))
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
blocks := make(chan Block)
globalLock.RLock()
@ -418,28 +422,29 @@ func (ss *SqliteSession) interleavedSpatialQuery(
}
}()
return
return blocks, nil
}
func (ss *SqliteSession) plainSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
done <-chan struct{}) (<-chan Block, error) {
var (
firstKey int64
secondKey int64
err error
)
if firstKey, err = common.DecodeStringFromBytes(first); err != nil {
return
return nil, err
}
if secondKey, err = common.DecodeStringFromBytes(second); err != nil {
return
return nil, err
}
c1 := common.PlainToCoord(firstKey)
c2 := common.PlainToCoord(secondKey)
c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2)
blocks = make(chan Block)
blocks := make(chan Block)
globalLock.RLock()
@ -496,5 +501,5 @@ func (ss *SqliteSession) plainSpatialQuery(
}
}()
return
return blocks, nil
}