// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "log" "sync" "bitbucket.org/s_l_teichmann/mtredisalize/common" leveldb "github.com/jmhodges/levigo" ) type LevelDBBackend struct { cache *leveldb.Cache db *leveldb.DB interleaved bool encoder common.KeyTranscoder decoder common.KeyTranscoder mutex sync.RWMutex } type LevelDBSession struct { backend *LevelDBBackend tx *leveldb.WriteBatch } func NewLeveDBBackend( path string, interleaved bool, cacheSize int) (ldb *LevelDBBackend, err error) { opts := leveldb.NewOptions() var cache *leveldb.Cache if cacheSize > 0 { cache = leveldb.NewLRUCache(cacheSize * 1024 * 1024) opts.SetCache(cache) } opts.SetCreateIfMissing(true) var db *leveldb.DB if db, err = leveldb.Open(path, opts); err != nil { if cache != nil { cache.Close() } return } var ( encoder common.KeyTranscoder decoder common.KeyTranscoder ) if interleaved { encoder = common.TranscodeInterleavedToPlain decoder = common.TranscodePlainToInterleaved } else { encoder = common.IdentityTranscoder decoder = common.IdentityTranscoder } ldb = &LevelDBBackend{ cache: cache, db: db, interleaved: interleaved, encoder: encoder, decoder: decoder} return } func (ldb *LevelDBBackend) doRead(f func(db *leveldb.DB)) { ldb.mutex.RLock() f(ldb.db) ldb.mutex.RUnlock() } func (ldb *LevelDBBackend) doWrite(f func(db *leveldb.DB)) { ldb.mutex.Lock() f(ldb.db) ldb.mutex.Unlock() } func (ldb *LevelDBBackend) NewSession() (Session, error) { return &LevelDBSession{ldb, nil}, nil } func (ldbs *LevelDBSession) Close() error { if ldbs.tx != nil { ldbs.tx.Close() } return nil } func (ldb *LevelDBBackend) Shutdown() error { ldb.db.Close() if ldb.cache != nil { ldb.cache.Close() } return nil } func (ldbs *LevelDBSession) Fetch(hash, key []byte) (value []byte, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } ldbs.backend.doRead(func(db *leveldb.DB) { ro := leveldb.NewReadOptions() value, err = ldbs.backend.db.Get(ro, key) //if err != nil { // log.Printf("Fetch key '%s' failed.\n", key) //} else { // log.Printf("Fetch key = '%s' len(value) = %d\n", key, len(value)) //} ro.Close() }) return } func (ldbs *LevelDBSession) InTransaction() bool { return ldbs.tx != nil } func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte if data, err = db.Get(ro, key); err != nil { return } exists = data != nil return } func (ldbs *LevelDBSession) Store(hash, key, value []byte) (exists bool, err error) { if key, err = ldbs.backend.decoder(key); err != nil { return } ldbs.backend.doWrite(func(db *leveldb.DB) { if exists, err = keyExists(db, key); err != nil { return } if ldbs.tx != nil { ldbs.tx.Put(key, value) return } wo := leveldb.NewWriteOptions() err = ldbs.backend.db.Put(wo, key, value) wo.Close() }) return } func (ldbs *LevelDBSession) BeginTransaction() error { ldbs.tx = leveldb.NewWriteBatch() return nil } func (ldbs *LevelDBSession) CommitTransaction() (err error) { tx := ldbs.tx if tx == nil { log.Println("WARN: No transaction running.") return } ldbs.tx = nil ldbs.backend.doWrite(func(db *leveldb.DB) { wo := leveldb.NewWriteOptions() wo.SetSync(true) err = db.Write(wo, tx) wo.Close() tx.Close() }) return } func (ldbs *LevelDBSession) AllKeys(hash []byte, done chan struct{}) (keys chan []byte, n int, err error) { ldbs.backend.mutex.RLock() ro := leveldb.NewReadOptions() ro.SetFillCache(false) it := ldbs.backend.db.NewIterator(ro) it.SeekToFirst() for ; it.Valid(); it.Next() { n++ } if err = it.GetError(); err != nil { it.Close() ro.Close() ldbs.backend.mutex.RUnlock() return } keys = make(chan []byte) go func() { ldbs.backend.mutex.RUnlock() defer ro.Close() defer close(keys) defer it.Close() it.SeekToFirst() encoder := ldbs.backend.encoder for ; it.Valid(); it.Next() { if key, err := encoder(it.Key()); err == nil { select { case keys <- key: case <-done: return } } else { log.Printf("WARN: %s\n", err) return } } if err := it.GetError(); err != nil { log.Printf("WARN: %s\n", err) } }() return } func (ldbs *LevelDBSession) SpatialQuery(hash, first, second []byte, done chan struct{}) (chan Block, error) { if ldbs.backend.interleaved { return ldbs.interleavedSpatialQuery(first, second, done) } return ldbs.plainSpatialQuery(first, second, done) } func (ldbs *LevelDBSession) plainSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { var ( firstKey int64 secondKey int64 ) if firstKey, err = common.DecodeStringFromBytes(first); err != nil { return } if secondKey, err = common.DecodeStringFromBytes(second); err != nil { return } c1 := common.PlainToCoord(firstKey) c2 := common.PlainToCoord(secondKey) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) blocks = make(chan Block) go func() { defer close(blocks) ldbs.backend.mutex.RLock() defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close() ro.SetFillCache(false) it := ldbs.backend.db.NewIterator(ro) defer it.Close() a, b := common.Coord{X: c1.X}, common.Coord{X: c2.X} var err error for a.Z = c1.Z; a.Z <= c2.Z; a.Z++ { b.Z = a.Z for a.Y = c1.Y; a.Y <= c2.Y; a.Y++ { b.Y = a.Y from, to := order(common.CoordToPlain(a), common.CoordToPlain(b)) it.Seek(common.StringToBytes(from)) for ; it.Valid(); it.Next() { var ( key = it.Key() pos int64 ) if pos, err = common.DecodeStringFromBytes(key); err != nil { log.Printf("decoding key failed: %s", err) return } if pos > to { break } select { case blocks <- Block{Key: key, Data: it.Value()}: case <-done: return } } if err = it.GetError(); err != nil { log.Printf("iterating failed: %s", err) return } } } }() return } func (ldbs *LevelDBSession) interleavedSpatialQuery(first, second []byte, done chan struct{}) (blocks chan Block, err error) { var ( firstKey int64 secondKey int64 ) if firstKey, err = common.DecodeStringFromBytes(first); err != nil { return } if secondKey, err = common.DecodeStringFromBytes(second); err != nil { return } c1 := common.PlainToCoord(firstKey) c2 := common.PlainToCoord(secondKey) c1, c2 = common.MinCoord(c1, c2), common.MaxCoord(c1, c2) blocks = make(chan Block) go func() { defer close(blocks) ldbs.backend.mutex.RLock() defer ldbs.backend.mutex.RUnlock() ro := leveldb.NewReadOptions() defer ro.Close() ro.SetFillCache(false) it := ldbs.backend.db.NewIterator(ro) defer it.Close() zmin, zmax := common.CoordToInterleaved(c1), common.CoordToInterleaved(c2) // Should not be necessary. zmin, zmax = order(zmin, zmax) var ( cub = common.Cuboid{P1: c1, P2: c2} err error encodedKey []byte ) it.Seek(common.ToBigEndian(zmin)) for it.Valid() { zcode := common.FromBigEndian(it.Key()) if zcode > zmax { break } if c := common.InterleavedToCoord(zcode); cub.Contains(c) { if encodedKey, err = common.EncodeStringToBytes(common.CoordToPlain(c)); err != nil { log.Printf("error encoding key: %s", err) return } select { case blocks <- Block{Key: encodedKey, Data: it.Value()}: case <-done: return } it.Next() } else { it.Seek(common.ToBigEndian(common.NaiveBigMin(zmin, zmax, zcode))) } } if err = it.GetError(); err != nil { log.Printf("error while iterating: %s", err) return } }() return }