Merged coverage-exp into default.

This commit is contained in:
Sascha L. Teichmann 2015-07-25 11:49:19 +02:00
commit edbd790268
7 changed files with 293 additions and 113 deletions

View File

@ -14,11 +14,13 @@ import (
) )
type LevelDBBackend struct { type LevelDBBackend struct {
cache *leveldb.Cache cache *leveldb.Cache
db *leveldb.DB db *leveldb.DB
interleaved bool interleaved bool
encoder common.KeyTranscoder coverage *common.Coverage3D
decoder common.KeyTranscoder encoder common.KeyTranscoder
decoder common.KeyTranscoder
changeTracker *ChangeTracker changeTracker *ChangeTracker
mutex sync.RWMutex mutex sync.RWMutex
} }
@ -71,9 +73,42 @@ func NewLeveDBBackend(
encoder: encoder, encoder: encoder,
decoder: decoder, decoder: decoder,
changeTracker: changeTracker} changeTracker: changeTracker}
if !interleaved {
if err = ldb.buildCoverage(); err != nil {
ldb.Shutdown()
ldb = nil
return
}
}
return return
} }
func (ldb *LevelDBBackend) buildCoverage() error {
log.Println("INFO: Start building coverage index (this may take some time)...")
coverage := common.NewCoverage3D()
ro := leveldb.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
it := ldb.db.NewIterator(ro)
it.SeekToFirst()
for ; it.Valid(); it.Next() {
c, err := common.DecodeStringBytesToCoord(it.Key())
if err != nil {
return err
}
coverage.Insert(c)
}
if err := it.GetError(); err != nil {
return err
}
ldb.coverage = coverage
log.Println("INFO: Finished building coverage index.")
return nil
}
func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) {
ldb.mutex.RLock() ldb.mutex.RLock()
f(ldb.db) f(ldb.db)
@ -156,6 +191,13 @@ func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err err
}) })
// This technically too early because this done in transactions // This technically too early because this done in transactions
// which are commited (and possible fail) later. // which are commited (and possible fail) later.
if ldbs.backend.coverage != nil {
c, err := common.DecodeStringBytesToCoord(origKey)
if err != nil {
return false, err
}
ldbs.backend.coverage.Insert(c)
}
if ldbs.backend.changeTracker != nil { if ldbs.backend.changeTracker != nil {
ldbs.backend.changeTracker.BlockChanged(origKey) ldbs.backend.changeTracker.BlockChanged(origKey)
} }
@ -184,7 +226,10 @@ func (ldbs *LevelDBSession) CommitTransaction() (err error) {
return return
} }
func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { func (ldbs *LevelDBSession) AllKeys(
hash []byte,
done chan struct{}) (keys chan []byte, n int, err error) {
ldbs.backend.mutex.RLock() ldbs.backend.mutex.RLock()
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
@ -232,14 +277,19 @@ func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan
return return
} }
func (ldbs *LevelDBSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) { func (ldbs *LevelDBSession) SpatialQuery(
hash, first, second []byte,
done chan struct{}) (chan Block, error) {
if ldbs.backend.interleaved { if ldbs.backend.interleaved {
return ldbs.interleavedSpatialQuery(first, second, done) return ldbs.interleavedSpatialQuery(first, second, done)
} }
return ldbs.plainSpatialQuery(first, second, done) return ldbs.plainSpatialQuery(first, second, done)
} }
func (ldbs *LevelDBSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { func (ldbs *LevelDBSession) plainSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
var ( var (
firstKey int64 firstKey int64
@ -264,50 +314,44 @@ func (ldbs *LevelDBSession) plainSpatialQuery(first, second []byte, done chan st
ro := leveldb.NewReadOptions() ro := leveldb.NewReadOptions()
defer ro.Close() defer ro.Close()
ro.SetFillCache(false)
it := ldbs.backend.db.NewIterator(ro) var a, b common.Coord
defer it.Close()
a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X} for _, r := range ldbs.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
var err error a.X, b.X = int16(r.X1), int16(r.X2)
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ {
b.Z = a.Z
for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ {
b.Y = a.Y b.Y = a.Y
from, to := order(common.CoordToPlain(a), common.CoordToPlain(b)) // The keys in the database are stored and ordered as strings
it.Seek(common.StringToBytes(from)) // "1", "10", ..., "19", "2", "20", "21" so you cannot use
for ; it.Valid(); it.Next() { // an iterator and assume it is numerical ordered.
var ( // Each block is fetched with a Get instead.
key = it.Key() for f, t := common.CoordToPlain(a), common.CoordToPlain(b); f <= t; f++ {
pos int64 key := common.StringToBytes(f)
) value, err := ldbs.backend.db.Get(ro, key)
if pos, err = common.DecodeStringFromBytes(key); err != nil { if err != nil {
log.Printf("decoding key failed: %s\n", err) log.Printf("get failed: %s\n", err)
return return
} }
if pos > to { if value != nil {
break select {
} case blocks <- Block{Key: key, Data: value}:
select { case <-done:
case blocks <- Block{Key: key, Data: it.Value()}: return
case <-done: }
return
} }
} }
if err = it.GetError(); err != nil {
log.Printf("iterating failed: %s\n", err)
return
}
} }
} }
}() }()
return return
} }
func (ldbs *LevelDBSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { func (ldbs *LevelDBSession) interleavedSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
var ( var (
firstKey int64 firstKey int64
secondKey int64 secondKey int64

View File

@ -32,6 +32,7 @@ type SqliteBackend struct {
decoder common.KeyDecoder decoder common.KeyDecoder
changeTracker *ChangeTracker changeTracker *ChangeTracker
interleaved bool interleaved bool
coverage *common.Coverage3D
existsStmt *sql.Stmt existsStmt *sql.Stmt
fetchStmt *sql.Stmt fetchStmt *sql.Stmt
insertStmt *sql.Stmt insertStmt *sql.Stmt
@ -112,10 +113,38 @@ func NewSqliteBackend(
res.decoder = common.DecodeStringFromBytes res.decoder = common.DecodeStringFromBytes
} }
if !interleaved {
if err = res.buildCoverage(); err != nil {
return
}
}
sqlb = &res sqlb = &res
return return
} }
func (sb *SqliteBackend) buildCoverage() (err error) {
log.Println("INFO: Start building coverage index (this may take some time)...")
sb.coverage = common.NewCoverage3D()
var rows *sql.Rows
if rows, err = sb.keysStmt.Query(); err != nil {
return
}
defer rows.Close()
for rows.Next() {
var key int64
if err = rows.Scan(&key); err != nil {
return
}
sb.coverage.Insert(common.PlainToCoord(key))
}
err = rows.Err()
log.Println("INFO: Finished building coverage index.")
return
}
func closeStmt(stmt **sql.Stmt) error { func closeStmt(stmt **sql.Stmt) error {
s := *stmt s := *stmt
if s != nil { if s != nil {
@ -212,6 +241,10 @@ func (ss *SqliteSession) Store(hash, key, value []byte) (exists bool, err error)
} }
// This technically too early because this done in transactions // This technically too early because this done in transactions
// which are commited (and possible fail) later. // which are commited (and possible fail) later.
if ss.backend.coverage != nil {
ss.backend.coverage.Insert(common.PlainToCoord(pos))
}
if ss.backend.changeTracker != nil { if ss.backend.changeTracker != nil {
ss.backend.changeTracker.BlockChanged(key) ss.backend.changeTracker.BlockChanged(key)
} }
@ -244,7 +277,9 @@ func (ss *SqliteSession) CommitTransaction() error {
return tx.Commit() return tx.Commit()
} }
func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { func (ss *SqliteSession) AllKeys(
hash []byte,
done chan struct{}) (keys chan []byte, n int, err error) {
globalLock.RLock() globalLock.RLock()
countStmt := ss.txStmt(ss.backend.countStmt) countStmt := ss.txStmt(ss.backend.countStmt)
@ -291,7 +326,9 @@ func (ss *SqliteSession) AllKeys(hash []byte, done chan struct{}) (keys chan []b
return return
} }
func (ss *SqliteSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) { func (ss *SqliteSession) SpatialQuery(
hash, first, second []byte,
done chan struct{}) (chan Block, error) {
if ss.backend.interleaved { if ss.backend.interleaved {
return ss.interleavedSpatialQuery(first, second, done) return ss.interleavedSpatialQuery(first, second, done)
@ -300,7 +337,9 @@ func (ss *SqliteSession) SpatialQuery(hash, first, second []byte, done chan stru
return ss.plainSpatialQuery(first, second, done) return ss.plainSpatialQuery(first, second, done)
} }
func (ss *SqliteSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { func (ss *SqliteSession) interleavedSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
var ( var (
firstKey int64 firstKey int64
secondKey int64 secondKey int64
@ -376,7 +415,9 @@ func (ss *SqliteSession) interleavedSpatialQuery(first, second []byte, done chan
return return
} }
func (ss *SqliteSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { func (ss *SqliteSession) plainSpatialQuery(
first, second []byte,
done chan struct{}) (blocks chan Block, err error) {
var ( var (
firstKey int64 firstKey int64
@ -401,45 +442,50 @@ func (ss *SqliteSession) plainSpatialQuery(first, second []byte, done chan struc
defer close(blocks) defer close(blocks)
rangeStmt := ss.txStmt(ss.backend.rangeStmt) rangeStmt := ss.txStmt(ss.backend.rangeStmt)
a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X} send := func(rows *sql.Rows, err error) bool {
if err != nil {
log.Printf("Error in range query: %s\n", err)
return false
}
defer rows.Close()
for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ { for rows.Next() {
b.Z = a.Z var key int64
for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ { var data []byte
if err = rows.Scan(&key, &data); err != nil {
log.Printf("Error in scanning row: %s\n", err)
return false
}
var encodedKey []byte
if encodedKey, err = common.EncodeStringToBytes(key); err != nil {
log.Printf("Key encoding failed: %s\n", err)
return false
}
select {
case blocks <- Block{Key: encodedKey, Data: data}:
case <-done:
return false
}
}
if err = rows.Err(); err != nil {
log.Printf("Error in range query: %s\n", err)
return false
}
return true
}
var a, b common.Coord
for _, r := range ss.backend.coverage.Query(c1, c2) {
a.Z, b.Z = int16(r.Z), int16(r.Z)
a.X, b.X = int16(r.X1), int16(r.X2)
// log.Printf("y1 y2 x1 x2 z: %d %d, %d %d, %d\n", r.Y1, r.Y2, r.X1, r.X2, r.Z)
for a.Y = r.Y2; a.Y >= r.Y1; a.Y-- {
b.Y = a.Y b.Y = a.Y
var ( from, to := common.CoordToPlain(a), common.CoordToPlain(b)
err error if !send(rangeStmt.Query(from, to)) {
rows *sql.Rows
)
// Ordering should not be necessary.
from, to := order(common.CoordToPlain(a), common.CoordToPlain(b))
if rows, err = rangeStmt.Query(from, to); err != nil {
log.Printf("Error in range query: %s\n", err)
return return
} }
for rows.Next() {
var key int64
var data []byte
if err = rows.Scan(&key, &data); err != nil {
log.Printf("Error in scanning row: %s\n", err)
break
}
var encodedKey []byte
if encodedKey, err = common.EncodeStringToBytes(key); err != nil {
log.Printf("Key encoding failed: %s\n", err)
break
}
select {
case blocks <- Block{Key: encodedKey, Data: data}:
case <-done:
rows.Close()
return
}
}
if err = rows.Err(); err != nil {
log.Printf("Error in range query: %s\n", err)
}
rows.Close()
} }
} }
}() }()

View File

@ -132,7 +132,7 @@ func (btc *BaseTileCreator) CreateTile(x, z int16, i, j int) error {
return err return err
} }
} }
log.Printf("Writing empty (%d, %d) to file %s\n", x, z, path) //log.Printf("Writing empty (%d, %d) to file %s\n", x, z, path)
return ioutil.WriteFile(path, btc.emptyImage, 0666) return ioutil.WriteFile(path, btc.emptyImage, 0666)
} }

View File

@ -45,20 +45,6 @@ func (c Coord) String() string {
return fmt.Sprintf("(%d, %d, %d)", c.X, c.Y, c.Z) return fmt.Sprintf("(%d, %d, %d)", c.X, c.Y, c.Z)
} }
func minComponent(a, b int16) int16 {
if a < b {
return a
}
return b
}
func maxComponent(a, b int16) int16 {
if a > b {
return a
}
return b
}
func clipComponent(x int16) int16 { func clipComponent(x int16) int16 {
if x < minValue { if x < minValue {
return minValue return minValue
@ -78,16 +64,16 @@ func ClipCoord(c Coord) Coord {
func MinCoord(a, b Coord) Coord { func MinCoord(a, b Coord) Coord {
return Coord{ return Coord{
X: minComponent(a.X, b.X), X: min16(a.X, b.X),
Y: minComponent(a.Y, b.Y), Y: min16(a.Y, b.Y),
Z: minComponent(a.Z, b.Z)} Z: min16(a.Z, b.Z)}
} }
func MaxCoord(a, b Coord) Coord { func MaxCoord(a, b Coord) Coord {
return Coord{ return Coord{
X: maxComponent(a.X, b.X), X: max16(a.X, b.X),
Y: maxComponent(a.Y, b.Y), Y: max16(a.Y, b.Y),
Z: maxComponent(a.Z, b.Z)} Z: max16(a.Z, b.Z)}
} }
// DecodeStringFromBytes constructs a database key out of byte slice. // DecodeStringFromBytes constructs a database key out of byte slice.

82
common/coverage.go Normal file
View File

@ -0,0 +1,82 @@
package common
import "sync"
type zRange struct {
y1 int16
y2 int16
xRange *Span
}
type Coverage3D struct {
pool *SpanPool
zRanges map[int16]*zRange
mu sync.RWMutex
}
type Range struct {
Z int16
Y1 int16
Y2 int16
X1 int16
X2 int16
}
func NewCoverage3D() *Coverage3D {
return &Coverage3D{
pool: NewSpanPool(),
zRanges: map[int16]*zRange{}}
}
func (c3d *Coverage3D) Insert(c Coord) {
c3d.mu.Lock()
defer c3d.mu.Unlock()
zr := c3d.zRanges[c.Z]
if zr == nil {
xr := c3d.pool.Alloc()
xr.From = int32(c.X)
xr.To = int32(c.X)
xr.Next = nil
c3d.zRanges[c.Z] = &zRange{
y1: c.Y,
y2: c.Y,
xRange: xr}
return
}
zr.xRange = c3d.pool.Insert(zr.xRange, int32(c.X), 0)
if c.Y < zr.y1 {
zr.y1 = c.Y
}
if c.Y > zr.y2 {
zr.y2 = c.Y
}
}
func (c3d *Coverage3D) Query(c1, c2 Coord) []Range {
c1, c2 = MinCoord(c1, c2), MaxCoord(c1, c2)
c3d.mu.RLock()
defer c3d.mu.RUnlock()
r := make([]Range, 0, 32)
for z := c1.Z; z <= c2.Z; z++ {
zr := c3d.zRanges[z]
if zr == nil || c1.Y > zr.y2 || c2.Y < zr.y1 {
continue
}
y1, y2 := max16(c1.Y, zr.y1), min16(c2.Y, zr.y2)
for xr := zr.xRange; xr != nil && xr.From <= int32(c2.X); xr = xr.Next {
if xr.To < int32(c1.X) {
continue
}
r = append(r, Range{
Z: z,
Y1: y1,
Y2: y2,
X1: max16(c1.X, int16(xr.From)),
X2: min16(c2.X, int16(xr.To))})
}
}
return r
}

36
common/math.go Normal file
View File

@ -0,0 +1,36 @@
package common
func max(a, b int) int {
if a > b {
return a
}
return b
}
func max32(a, b int32) int32 {
if a > b {
return a
}
return b
}
func min32(a, b int32) int32 {
if a < b {
return a
}
return b
}
func max16(a, b int16) int16 {
if a > b {
return a
}
return b
}
func min16(a, b int16) int16 {
if a < b {
return a
}
return b
}

View File

@ -49,20 +49,6 @@ func (yo *YOrder) Reset() {
yo.blocks = yo.blocks[0:0] yo.blocks = yo.blocks[0:0]
} }
func max(a, b int) int {
if a > b {
return a
}
return b
}
func max32(a, b int32) int32 {
if a > b {
return a
}
return b
}
func copyData(data []byte) []byte { func copyData(data []byte) []byte {
l := len(data) l := len(data)
ndata := make([]byte, l, max(l, 8*1024)) ndata := make([]byte, l, max(l, 8*1024))