From 3929ffc3b25d46cbc2f8020ab35cba4cb5e1f5e1 Mon Sep 17 00:00:00 2001 From: "Sascha L. Teichmann" Date: Sun, 31 Aug 2014 19:21:58 +0200 Subject: [PATCH] Stop leaking go routines if something with AllKeys went wrong. --- backend.go | 2 +- connection.go | 13 +++++++++---- leveldb.go | 8 ++++++-- sqlite.go | 9 ++++++--- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/backend.go b/backend.go index 3165f79..0f4f2e2 100644 --- a/backend.go +++ b/backend.go @@ -9,7 +9,7 @@ type ( Fetch(hash, key []byte) ([]byte, error) InTransaction() bool Store(hash, key, value []byte) (bool, error) - AllKeys(hash []byte) (chan []byte, int, error) + AllKeys(hash []byte, done chan struct{}) (chan []byte, int, error) BeginTransaction() error CommitTransaction() error Close() error diff --git a/connection.go b/connection.go index 4bfc1bd..accb9b4 100644 --- a/connection.go +++ b/connection.go @@ -100,10 +100,15 @@ func (c *Connection) Exec() bool { } func (c *Connection) Hkeys(hash []byte) bool { - var err error - var n int - var keys chan []byte - if keys, n, err = c.session.AllKeys(hash); err != nil { + var ( + err error + n int + keys chan []byte + done = make(chan struct{}) + ) + defer close(done) + + if keys, n, err = c.session.AllKeys(hash, done); err != nil { return c.writeError(err) } diff --git a/leveldb.go b/leveldb.go index 27ebd24..47fa8b1 100644 --- a/leveldb.go +++ b/leveldb.go @@ -175,7 +175,7 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) { return } -func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) { +func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { ldbs.backend.mutex.RLock() ro := leveldb.NewReadOptions() @@ -205,7 +205,11 @@ func (ldbs *LevelDBSession) AllKeys(hash []byte) (keys chan []byte, n int, err e encoder := ldbs.backend.encoder for ; it.Valid(); it.Next() { if key, err := encoder(it.Key()); err == nil { - keys <- key + select { + case keys <- key: + case <-done: + return + } } else { log.Printf("WARN: %s\n", err) return diff --git a/sqlite.go b/sqlite.go index fcd9b6a..d27bc0a 100644 --- a/sqlite.go +++ b/sqlite.go @@ -228,7 +228,7 @@ func (ss *SqliteSession) CommitTransaction() error { return tx.Commit() } -func (ss *SqliteSession) AllKeys(hash []byte) (keys chan []byte, n int, err error) { +func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { globalLock.RLock() countStmt := ss.txStmt(ss.backend.countStmt) @@ -264,8 +264,11 @@ func (ss *SqliteSession) AllKeys(hash []byte) (keys chan []byte, n int, err erro log.Printf("Cannot encode key: %d %s\n", key, err) break } - - keys <- encoded + select { + case keys <- encoded: + case <-done: + return + } } }()