Stop leaking go routines if something with AllKeys went wrong.

This commit is contained in:
Sascha L. Teichmann 2014-08-31 19:21:58 +02:00
parent 48259f14a6
commit 3929ffc3b2
4 changed files with 22 additions and 10 deletions

View File

@ -9,7 +9,7 @@ type (
Fetch(hash, key []byte) ([]byte, error) Fetch(hash, key []byte) ([]byte, error)
InTransaction() bool InTransaction() bool
Store(hash, key, value []byte) (bool, error) Store(hash, key, value []byte) (bool, error)
AllKeys(hash []byte) (chan []byte, int, error) AllKeys(hash []byte, done chan struct{}) (chan []byte, int, error)
BeginTransaction() error BeginTransaction() error
CommitTransaction() error CommitTransaction() error
Close() error Close() error

View File

@ -100,10 +100,15 @@ func (c *Connection) Exec() bool {
} }
func (c *Connection) Hkeys(hash []byte) bool { func (c *Connection) Hkeys(hash []byte) bool {
var err error var (
var n int err error
var keys chan []byte n int
if keys, n, err = c.session.AllKeys(hash); err != nil { keys chan []byte
done = make(chan struct{})
)
defer close(done)
if keys, n, err = c.session.AllKeys(hash, done); err != nil {
return c.writeError(err) return c.writeError(err)
} }

View File

@ -175,7 +175,7 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return return
} }
func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) { func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) {
ldbs.backend.mutex.RLock() ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
@ -205,7 +205,11 @@ func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err e
encoder := ldbs.backend.encoder encoder := ldbs.backend.encoder
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
if key, err := encoder(it.Key()); err == nil { if key, err := encoder(it.Key()); err == nil {
keys <- key select {
case keys <- key:
case <-done:
return
}
} else { } else {
log.Printf("WARN: %s\n", err) log.Printf("WARN: %s\n", err)
return return

View File

@ -228,7 +228,7 @@ func (ss *SqliteSession) CommitTransaction() error {
return tx.Commit() return tx.Commit()
} }
func (ss *SqliteSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) { func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) {
globalLock.RLock() globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt) countStmt := ss.txStmt(ss.backend.countStmt)
@ -264,8 +264,11 @@ func (ss *SqliteSession) AllKeys(hash []byte) (keys chan []byte, n int, err erro
log.Printf("Cannot encode key: %d %s\n", key, err) log.Printf("Cannot encode key: %d %s\n", key, err)
break break
} }
select {
keys <- encoded case keys <- encoded:
case <-done:
return
}
} }
}() }()