Fixed non-interleaved LevelDB backend.

This commit is contained in:
Sascha L. Teichmann 2015-07-24 13:15:05 +02:00
parent fe6a551f06
commit c9686a7f24
1 changed files with 16 additions and 24 deletions

View File

@ -314,42 +314,34 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro)
defer it.Close()
var a, b common.Coord
var err error
for _, r := range ldbs.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
a.X, b.X = int16(r.X1), int16(r.X2)
// log.Printf("y1 y2 x1 x2 z: %d %d, %d %d, %d\n", r.Y1, r.Y2, r.X1, r.X2, r.Z)
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
b.Y = a.Y
from, to := common.CoordToPlain(a), common.CoordToPlain(b)
it.Seek(common.StringToBytes(from))
for ; it.Valid(); it.Next() {
key := it.Key()
var pos int64
if pos, err = common.DecodeStringFromBytes(key); err != nil {
log.Printf("decoding key failed: %s\n", err)
// The keys in the database are stored and ordered as strings
// "1", "10", ..., "19", "2", "20", "21" so you cannot use
// an iterator and assume it is numerical ordered.
// Each block is fetched with a Get instead.
for f, t := common.CoordToPlain(a), common.CoordToPlain(b); f <= t; f++ {
key := common.StringToBytes(f)
value, err := ldbs.backend.db.Get(ro, key)
if err != nil {
log.Printf("get failed: %s\n", err)
return
}
if pos > to {
break
}
select {
case blocks <- Block{Key: key, Data: it.Value()}:
case <-done:
return
if value != nil {
select {
case blocks <- Block{Key: key, Data: value}:
case <-done:
return
}
}
}
if err = it.GetError(); err != nil {
log.Printf("iterating failed: %s\n", err)
return
}
}
}
}()