interleaver: Do not share error in producer and consumer.

This commit is contained in:
Sascha L. Teichmann 2014-08-24 19:40:57 +02:00
parent fc50897d2d
commit 307d1aec44

View File

@ -48,7 +48,7 @@ func selectKeyEncoder(interleaved bool) common.KeyEncoder {
return common.EncodeStringToBytes return common.EncodeStringToBytes
} }
func copyProducerToConsumer(producer BlockProducer, consumer BlockConsumer) (err error) { func copyProducerToConsumer(producer BlockProducer, consumer BlockConsumer) error {
blocks := make(chan *Block) blocks := make(chan *Block)
done := make(chan struct{}) done := make(chan struct{})
@ -58,7 +58,7 @@ func copyProducerToConsumer(producer BlockProducer, consumer BlockConsumer) (err
defer close(blocks) defer close(blocks)
for { for {
block := new(Block) block := new(Block)
if err = producer.Next(block); err != nil { if err := producer.Next(block); err != nil {
if err != ErrNoMoreBlocks { if err != ErrNoMoreBlocks {
log.Printf("Reading failed: %s\n", err) log.Printf("Reading failed: %s\n", err)
} }
@ -74,8 +74,8 @@ func copyProducerToConsumer(producer BlockProducer, consumer BlockConsumer) (err
i := 0 i := 0
for block := range blocks { for block := range blocks {
if err = consumer.Consume(block); err != nil { if err := consumer.Consume(block); err != nil {
return return err
} }
i++ i++
if i%1000 == 0 { if i%1000 == 0 {
@ -84,7 +84,7 @@ func copyProducerToConsumer(producer BlockProducer, consumer BlockConsumer) (err
} }
log.Printf("%d blocks transferred in total.\n", i) log.Printf("%d blocks transferred in total.\n", i)
return return nil
} }
func main() { func main() {