mirror of
https://bitbucket.org/s_l_teichmann/mtsatellite
synced 2025-01-24 15:20:21 +01:00
unexport redis and database interfaces in mtredisalize
This commit is contained in:
parent
75cc24c04a
commit
0c46058261
@ -44,8 +44,8 @@ func (c *connection) run() {
|
|||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
}()
|
}()
|
||||||
r := bufio.NewReaderSize(c.conn, 8*1024)
|
r := bufio.NewReaderSize(c.conn, 8*1024)
|
||||||
parser := NewRedisParser(r, c, c.maxBulkStringSize)
|
parser := newRedisParser(r, c, c.maxBulkStringSize)
|
||||||
parser.Parse()
|
parser.parse()
|
||||||
log.Println("client disconnected")
|
log.Println("client disconnected")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,7 +57,7 @@ func logError(err error) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Hdel(hash, key []byte) bool {
|
func (c *connection) hdel(hash, key []byte) bool {
|
||||||
|
|
||||||
success, err := c.session.del(hash, key)
|
success, err := c.session.del(hash, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -67,7 +67,7 @@ func (c *connection) Hdel(hash, key []byte) bool {
|
|||||||
return c.writeBool(success)
|
return c.writeBool(success)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Hget(hash, key []byte) bool {
|
func (c *connection) hget(hash, key []byte) bool {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var data []byte
|
var data []byte
|
||||||
@ -78,7 +78,7 @@ func (c *connection) Hget(hash, key []byte) bool {
|
|||||||
return c.writeBlock(data)
|
return c.writeBlock(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Hset(hash, key, data []byte) bool {
|
func (c *connection) hset(hash, key, data []byte) bool {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var exists bool
|
var exists bool
|
||||||
@ -94,7 +94,7 @@ func (c *connection) Hset(hash, key, data []byte) bool {
|
|||||||
return c.writeBool(exists)
|
return c.writeBool(exists)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Multi() bool {
|
func (c *connection) multi() bool {
|
||||||
if c.session.inTransaction() {
|
if c.session.inTransaction() {
|
||||||
log.Println("WARN: Already running transaction.")
|
log.Println("WARN: Already running transaction.")
|
||||||
} else {
|
} else {
|
||||||
@ -105,7 +105,7 @@ func (c *connection) Multi() bool {
|
|||||||
return c.writeOk()
|
return c.writeOk()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Exec() bool {
|
func (c *connection) exec() bool {
|
||||||
if !c.session.inTransaction() {
|
if !c.session.inTransaction() {
|
||||||
return c.writeEmptyArray()
|
return c.writeEmptyArray()
|
||||||
}
|
}
|
||||||
@ -117,7 +117,7 @@ func (c *connection) Exec() bool {
|
|||||||
return c.writeBoolArray(arr)
|
return c.writeBoolArray(arr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Hkeys(hash []byte) bool {
|
func (c *connection) hkeys(hash []byte) bool {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
n int
|
n int
|
||||||
@ -146,11 +146,11 @@ func (c *connection) Hkeys(hash []byte) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) Ping() bool {
|
func (c *connection) ping() bool {
|
||||||
return c.writeMessage(redisPong)
|
return c.writeMessage(redisPong)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connection) HSpatial(hash, first, second []byte) bool {
|
func (c *connection) hSpatial(hash, first, second []byte) bool {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
blocks <-chan block
|
blocks <-chan block
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
leveldb "github.com/jmhodges/levigo"
|
leveldb "github.com/jmhodges/levigo"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LevelDBBackend struct {
|
type levelDBBackend struct {
|
||||||
cache *leveldb.Cache
|
cache *leveldb.Cache
|
||||||
db *leveldb.DB
|
db *leveldb.DB
|
||||||
interleaved bool
|
interleaved bool
|
||||||
@ -23,16 +23,16 @@ type LevelDBBackend struct {
|
|||||||
changeTracker *changeTracker
|
changeTracker *changeTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
type LevelDBSession struct {
|
type levelDBSession struct {
|
||||||
backend *LevelDBBackend
|
backend *levelDBBackend
|
||||||
tx *leveldb.WriteBatch
|
tx *leveldb.WriteBatch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLeveDBBackend(
|
func newLeveDBBackend(
|
||||||
path string,
|
path string,
|
||||||
changeTracker *changeTracker,
|
changeTracker *changeTracker,
|
||||||
interleaved bool,
|
interleaved bool,
|
||||||
cacheSize int) (ldb *LevelDBBackend, err error) {
|
cacheSize int) (ldb *levelDBBackend, err error) {
|
||||||
|
|
||||||
opts := leveldb.NewOptions()
|
opts := leveldb.NewOptions()
|
||||||
|
|
||||||
@ -64,7 +64,7 @@ func NewLeveDBBackend(
|
|||||||
decoder = common.IdentityTranscoder
|
decoder = common.IdentityTranscoder
|
||||||
}
|
}
|
||||||
|
|
||||||
ldb = &LevelDBBackend{
|
ldb = &levelDBBackend{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
db: db,
|
db: db,
|
||||||
interleaved: interleaved,
|
interleaved: interleaved,
|
||||||
@ -83,7 +83,7 @@ func NewLeveDBBackend(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldb *LevelDBBackend) buildCoverage() error {
|
func (ldb *levelDBBackend) buildCoverage() error {
|
||||||
log.Println("INFO: Start building coverage index (this may take some time)...")
|
log.Println("INFO: Start building coverage index (this may take some time)...")
|
||||||
|
|
||||||
coverage := common.NewCoverage3D()
|
coverage := common.NewCoverage3D()
|
||||||
@ -108,18 +108,18 @@ func (ldb *LevelDBBackend) buildCoverage() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldb *LevelDBBackend) newSession() (session, error) {
|
func (ldb *levelDBBackend) newSession() (session, error) {
|
||||||
return &LevelDBSession{ldb, nil}, nil
|
return &levelDBSession{ldb, nil}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) close() error {
|
func (ldbs *levelDBSession) close() error {
|
||||||
if ldbs.tx != nil {
|
if ldbs.tx != nil {
|
||||||
ldbs.tx.Close()
|
ldbs.tx.Close()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldb *LevelDBBackend) shutdown() error {
|
func (ldb *levelDBBackend) shutdown() error {
|
||||||
ldb.db.Close()
|
ldb.db.Close()
|
||||||
if ldb.cache != nil {
|
if ldb.cache != nil {
|
||||||
ldb.cache.Close()
|
ldb.cache.Close()
|
||||||
@ -127,7 +127,7 @@ func (ldb *LevelDBBackend) shutdown() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) del(_, key []byte) (success bool, err error) {
|
func (ldbs *levelDBSession) del(_, key []byte) (success bool, err error) {
|
||||||
if key, err = ldbs.backend.decoder(key); err != nil {
|
if key, err = ldbs.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -149,7 +149,7 @@ func (ldbs *LevelDBSession) del(_, key []byte) (success bool, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) fetch(_, key []byte) (value []byte, err error) {
|
func (ldbs *levelDBSession) fetch(_, key []byte) (value []byte, err error) {
|
||||||
if key, err = ldbs.backend.decoder(key); err != nil {
|
if key, err = ldbs.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -164,7 +164,7 @@ func (ldbs *LevelDBSession) fetch(_, key []byte) (value []byte, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) inTransaction() bool {
|
func (ldbs *levelDBSession) inTransaction() bool {
|
||||||
return ldbs.tx != nil
|
return ldbs.tx != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,7 +179,7 @@ func keyExists(db *leveldb.DB, key []byte) (exists bool, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) store(_, key, value []byte) (exists bool, err error) {
|
func (ldbs *levelDBSession) store(_, key, value []byte) (exists bool, err error) {
|
||||||
origKey := key
|
origKey := key
|
||||||
if key, err = ldbs.backend.decoder(key); err != nil {
|
if key, err = ldbs.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
@ -214,12 +214,12 @@ func (ldbs *LevelDBSession) store(_, key, value []byte) (exists bool, err error)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) beginTransaction() error {
|
func (ldbs *levelDBSession) beginTransaction() error {
|
||||||
ldbs.tx = leveldb.NewWriteBatch()
|
ldbs.tx = leveldb.NewWriteBatch()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) commitTransaction() (err error) {
|
func (ldbs *levelDBSession) commitTransaction() (err error) {
|
||||||
tx := ldbs.tx
|
tx := ldbs.tx
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
log.Println("WARN: No transaction running.")
|
log.Println("WARN: No transaction running.")
|
||||||
@ -234,7 +234,7 @@ func (ldbs *LevelDBSession) commitTransaction() (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) allKeys(
|
func (ldbs *levelDBSession) allKeys(
|
||||||
_ []byte,
|
_ []byte,
|
||||||
done <-chan struct{}) (<-chan []byte, int, error) {
|
done <-chan struct{}) (<-chan []byte, int, error) {
|
||||||
|
|
||||||
@ -282,7 +282,7 @@ func (ldbs *LevelDBSession) allKeys(
|
|||||||
return keys, n, nil
|
return keys, n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) spatialQuery(
|
func (ldbs *levelDBSession) spatialQuery(
|
||||||
_, first, second []byte,
|
_, first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
@ -292,7 +292,7 @@ func (ldbs *LevelDBSession) spatialQuery(
|
|||||||
return ldbs.plainSpatialQuery(first, second, done)
|
return ldbs.plainSpatialQuery(first, second, done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) plainSpatialQuery(
|
func (ldbs *levelDBSession) plainSpatialQuery(
|
||||||
first, second []byte,
|
first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
@ -352,7 +352,7 @@ func (ldbs *LevelDBSession) plainSpatialQuery(
|
|||||||
return blocks, nil
|
return blocks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldbs *LevelDBSession) interleavedSpatialQuery(
|
func (ldbs *levelDBSession) interleavedSpatialQuery(
|
||||||
first, second []byte,
|
first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
|
@ -105,11 +105,11 @@ func main() {
|
|||||||
path := flag.Arg(0)
|
path := flag.Arg(0)
|
||||||
|
|
||||||
if driver == "sqlite" {
|
if driver == "sqlite" {
|
||||||
if backend, err = NewSQLiteBackend(path, changeTracker, interleaved); err != nil {
|
if backend, err = newSQLiteBackend(path, changeTracker, interleaved); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if backend, err = NewLeveDBBackend(
|
if backend, err = newLeveDBBackend(
|
||||||
path, changeTracker, interleaved, cacheSize); err != nil {
|
path, changeTracker, interleaved, cacheSize); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -14,42 +14,42 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RedisCommands interface {
|
type redisCommands interface {
|
||||||
Hdel(hash, key []byte) bool
|
hdel(hash, key []byte) bool
|
||||||
Hget(hash, key []byte) bool
|
hget(hash, key []byte) bool
|
||||||
Hset(hash, key, block []byte) bool
|
hset(hash, key, block []byte) bool
|
||||||
Multi() bool
|
multi() bool
|
||||||
Exec() bool
|
exec() bool
|
||||||
Hkeys(hash []byte) bool
|
hkeys(hash []byte) bool
|
||||||
HSpatial(hash, first, second []byte) bool
|
hSpatial(hash, first, second []byte) bool
|
||||||
Ping() bool
|
ping() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type RedisParser struct {
|
type redisParser struct {
|
||||||
reader *bufio.Reader
|
reader *bufio.Reader
|
||||||
commands RedisCommands
|
commands redisCommands
|
||||||
missing int64
|
missing int64
|
||||||
args []interface{}
|
args []interface{}
|
||||||
maxBulkStringSize int64
|
maxBulkStringSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRedisParser(reader *bufio.Reader,
|
func newRedisParser(reader *bufio.Reader,
|
||||||
commands RedisCommands,
|
commands redisCommands,
|
||||||
maxBulkStringSize int64) *RedisParser {
|
maxBulkStringSize int64) *redisParser {
|
||||||
|
|
||||||
return &RedisParser{
|
return &redisParser{
|
||||||
reader: reader,
|
reader: reader,
|
||||||
commands: commands,
|
commands: commands,
|
||||||
maxBulkStringSize: maxBulkStringSize}
|
maxBulkStringSize: maxBulkStringSize}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) Parse() {
|
func (rp *redisParser) parse() {
|
||||||
for line := rp.nextLine(); line != nil && rp.dispatch(line); {
|
for line := rp.nextLine(); line != nil && rp.dispatch(line); {
|
||||||
line = rp.nextLine()
|
line = rp.nextLine()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) nextLine() []byte {
|
func (rp *redisParser) nextLine() []byte {
|
||||||
line, err := rp.reader.ReadBytes('\n')
|
line, err := rp.reader.ReadBytes('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
@ -60,7 +60,7 @@ func (rp *RedisParser) nextLine() []byte {
|
|||||||
return bytes.TrimRight(line, "\r\n")
|
return bytes.TrimRight(line, "\r\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) dispatch(line []byte) bool {
|
func (rp *redisParser) dispatch(line []byte) bool {
|
||||||
if len(line) < 1 {
|
if len(line) < 1 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -79,11 +79,11 @@ func (rp *RedisParser) dispatch(line []byte) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) simpleString(line []byte) bool {
|
func (rp *redisParser) simpleString(line []byte) bool {
|
||||||
return rp.consumeSimpleString(string(line[1:]))
|
return rp.consumeSimpleString(string(line[1:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) integer(line []byte) bool {
|
func (rp *redisParser) integer(line []byte) bool {
|
||||||
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
i, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rp.consumeError(err)
|
return rp.consumeError(err)
|
||||||
@ -91,7 +91,7 @@ func (rp *RedisParser) integer(line []byte) bool {
|
|||||||
return rp.consumeInteger(i)
|
return rp.consumeInteger(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) bulkString(line []byte) bool {
|
func (rp *redisParser) bulkString(line []byte) bool {
|
||||||
var i int64
|
var i int64
|
||||||
var err error
|
var err error
|
||||||
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
@ -123,7 +123,7 @@ func (rp *RedisParser) bulkString(line []byte) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) array(line []byte) bool {
|
func (rp *redisParser) array(line []byte) bool {
|
||||||
var i int64
|
var i int64
|
||||||
var err error
|
var err error
|
||||||
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
i, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
@ -133,7 +133,7 @@ func (rp *RedisParser) array(line []byte) bool {
|
|||||||
return rp.consumeArray(i)
|
return rp.consumeArray(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) push(i interface{}) bool {
|
func (rp *redisParser) push(i interface{}) bool {
|
||||||
rp.args = append(rp.args, i)
|
rp.args = append(rp.args, i)
|
||||||
rp.missing--
|
rp.missing--
|
||||||
if rp.missing <= 0 {
|
if rp.missing <= 0 {
|
||||||
@ -155,7 +155,7 @@ func asString(i interface{}) string {
|
|||||||
return fmt.Sprintf("%s", i)
|
return fmt.Sprintf("%s", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) execute() bool {
|
func (rp *redisParser) execute() bool {
|
||||||
l := len(rp.args)
|
l := len(rp.args)
|
||||||
if l < 1 {
|
if l < 1 {
|
||||||
log.Println("WARN: Too less argument for command.")
|
log.Println("WARN: Too less argument for command.")
|
||||||
@ -174,7 +174,7 @@ func (rp *RedisParser) execute() bool {
|
|||||||
log.Println("WARN: HDEL data are not byte slices.")
|
log.Println("WARN: HDEL data are not byte slices.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return rp.commands.Hdel(hash, key)
|
return rp.commands.hdel(hash, key)
|
||||||
case "HGET":
|
case "HGET":
|
||||||
if l < 3 {
|
if l < 3 {
|
||||||
log.Println("WARN: Missing argments for HGET.")
|
log.Println("WARN: Missing argments for HGET.")
|
||||||
@ -186,7 +186,7 @@ func (rp *RedisParser) execute() bool {
|
|||||||
log.Println("WARN: HGET data are not byte slices.")
|
log.Println("WARN: HGET data are not byte slices.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return rp.commands.Hget(hash, key)
|
return rp.commands.hget(hash, key)
|
||||||
|
|
||||||
case "HSET":
|
case "HSET":
|
||||||
if l < 4 {
|
if l < 4 {
|
||||||
@ -201,13 +201,13 @@ func (rp *RedisParser) execute() bool {
|
|||||||
log.Println("WARN: HSET data are not byte slices.")
|
log.Println("WARN: HSET data are not byte slices.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return rp.commands.Hset(hash, key, value)
|
return rp.commands.hset(hash, key, value)
|
||||||
|
|
||||||
case "MULTI":
|
case "MULTI":
|
||||||
return rp.commands.Multi()
|
return rp.commands.multi()
|
||||||
|
|
||||||
case "EXEC":
|
case "EXEC":
|
||||||
return rp.commands.Exec()
|
return rp.commands.exec()
|
||||||
|
|
||||||
case "HKEYS":
|
case "HKEYS":
|
||||||
if l < 2 {
|
if l < 2 {
|
||||||
@ -219,7 +219,7 @@ func (rp *RedisParser) execute() bool {
|
|||||||
log.Println("WARN: HKEYS data are not byte slices.")
|
log.Println("WARN: HKEYS data are not byte slices.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return rp.commands.Hkeys(hash)
|
return rp.commands.hkeys(hash)
|
||||||
|
|
||||||
case "HSPATIAL":
|
case "HSPATIAL":
|
||||||
if l < 4 {
|
if l < 4 {
|
||||||
@ -234,33 +234,33 @@ func (rp *RedisParser) execute() bool {
|
|||||||
log.Println("WARN: HSPATIAL data are not byte slices.")
|
log.Println("WARN: HSPATIAL data are not byte slices.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return rp.commands.HSpatial(hash, first, second)
|
return rp.commands.hSpatial(hash, first, second)
|
||||||
|
|
||||||
case "PING":
|
case "PING":
|
||||||
return rp.commands.Ping()
|
return rp.commands.ping()
|
||||||
}
|
}
|
||||||
log.Printf("WARN: unknown command: '%s'\n", cmd)
|
log.Printf("WARN: unknown command: '%s'\n", cmd)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) consumeSimpleString(s string) bool {
|
func (rp *redisParser) consumeSimpleString(s string) bool {
|
||||||
return rp.push(s)
|
return rp.push(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) consumeBulkString(data []byte) bool {
|
func (rp *redisParser) consumeBulkString(data []byte) bool {
|
||||||
return rp.push(data)
|
return rp.push(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) consumeInteger(i int64) bool {
|
func (rp *redisParser) consumeInteger(i int64) bool {
|
||||||
return rp.push(i)
|
return rp.push(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) consumeError(err error) bool {
|
func (rp *redisParser) consumeError(err error) bool {
|
||||||
log.Printf("error: %s\n", err)
|
log.Printf("error: %s\n", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RedisParser) consumeArray(i int64) bool {
|
func (rp *redisParser) consumeArray(i int64) bool {
|
||||||
if rp.missing > 0 {
|
if rp.missing > 0 {
|
||||||
log.Println("WARN: Nested arrays are not supported!")
|
log.Println("WARN: Nested arrays are not supported!")
|
||||||
return false
|
return false
|
||||||
|
@ -27,7 +27,7 @@ const (
|
|||||||
rangeSQL = "SELECT pos, data FROM blocks WHERE pos BETWEEN ? AND ? ORDER BY pos"
|
rangeSQL = "SELECT pos, data FROM blocks WHERE pos BETWEEN ? AND ? ORDER BY pos"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SQLiteBackend struct {
|
type sqLiteBackend struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
encoder common.KeyEncoder
|
encoder common.KeyEncoder
|
||||||
decoder common.KeyDecoder
|
decoder common.KeyDecoder
|
||||||
@ -44,16 +44,16 @@ type SQLiteBackend struct {
|
|||||||
rangeStmt *sql.Stmt
|
rangeStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
type SQLiteSession struct {
|
type sqLiteSession struct {
|
||||||
backend *SQLiteBackend
|
backend *sqLiteBackend
|
||||||
tx *sql.Tx
|
tx *sql.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sqlb *SQLiteBackend) newSession() (session, error) {
|
func (sqlb *sqLiteBackend) newSession() (session, error) {
|
||||||
return &SQLiteSession{sqlb, nil}, nil
|
return &sqLiteSession{sqlb, nil}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) close() error {
|
func (ss *sqLiteSession) close() error {
|
||||||
t := ss.tx
|
t := ss.tx
|
||||||
if t != nil {
|
if t != nil {
|
||||||
ss.tx = nil
|
ss.tx = nil
|
||||||
@ -62,11 +62,11 @@ func (ss *SQLiteSession) close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSQLiteBackend(
|
func newSQLiteBackend(
|
||||||
path string,
|
path string,
|
||||||
changeTracker *changeTracker, interleaved bool) (sqlb *SQLiteBackend, err error) {
|
changeTracker *changeTracker, interleaved bool) (sqlb *sqLiteBackend, err error) {
|
||||||
|
|
||||||
res := SQLiteBackend{interleaved: interleaved, changeTracker: changeTracker}
|
res := sqLiteBackend{interleaved: interleaved, changeTracker: changeTracker}
|
||||||
|
|
||||||
if res.db, err = sql.Open("sqlite3", path); err != nil {
|
if res.db, err = sql.Open("sqlite3", path); err != nil {
|
||||||
return
|
return
|
||||||
@ -130,7 +130,7 @@ func NewSQLiteBackend(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sqlb *SQLiteBackend) buildCoverage() (err error) {
|
func (sqlb *sqLiteBackend) buildCoverage() (err error) {
|
||||||
log.Println("INFO: Start building coverage index (this may take some time)...")
|
log.Println("INFO: Start building coverage index (this may take some time)...")
|
||||||
sqlb.coverage = common.NewCoverage3D()
|
sqlb.coverage = common.NewCoverage3D()
|
||||||
|
|
||||||
@ -170,7 +170,7 @@ func closeDB(db **sql.DB) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sqlb *SQLiteBackend) closeAll() error {
|
func (sqlb *sqLiteBackend) closeAll() error {
|
||||||
closeStmt(&sqlb.deleteStmt)
|
closeStmt(&sqlb.deleteStmt)
|
||||||
closeStmt(&sqlb.fetchStmt)
|
closeStmt(&sqlb.fetchStmt)
|
||||||
closeStmt(&sqlb.insertStmt)
|
closeStmt(&sqlb.insertStmt)
|
||||||
@ -182,21 +182,21 @@ func (sqlb *SQLiteBackend) closeAll() error {
|
|||||||
return closeDB(&sqlb.db)
|
return closeDB(&sqlb.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sqlb *SQLiteBackend) shutdown() error {
|
func (sqlb *sqLiteBackend) shutdown() error {
|
||||||
globalLock.Lock()
|
globalLock.Lock()
|
||||||
defer globalLock.Unlock()
|
defer globalLock.Unlock()
|
||||||
|
|
||||||
return sqlb.closeAll()
|
return sqlb.closeAll()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
|
func (ss *sqLiteSession) txStmt(stmt *sql.Stmt) *sql.Stmt {
|
||||||
if ss.tx != nil {
|
if ss.tx != nil {
|
||||||
return ss.tx.Stmt(stmt)
|
return ss.tx.Stmt(stmt)
|
||||||
}
|
}
|
||||||
return stmt
|
return stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) del(_, key []byte) (success bool, err error) {
|
func (ss *sqLiteSession) del(_, key []byte) (success bool, err error) {
|
||||||
var pos int64
|
var pos int64
|
||||||
if pos, err = ss.backend.decoder(key); err != nil {
|
if pos, err = ss.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
@ -226,7 +226,7 @@ func (ss *SQLiteSession) del(_, key []byte) (success bool, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) fetch(_, key []byte) (data []byte, err error) {
|
func (ss *sqLiteSession) fetch(_, key []byte) (data []byte, err error) {
|
||||||
var pos int64
|
var pos int64
|
||||||
if pos, err = ss.backend.decoder(key); err != nil {
|
if pos, err = ss.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
@ -244,11 +244,11 @@ func (ss *SQLiteSession) fetch(_, key []byte) (data []byte, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) inTransaction() bool {
|
func (ss *sqLiteSession) inTransaction() bool {
|
||||||
return ss.tx != nil
|
return ss.tx != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) store(_, key, value []byte) (exists bool, err error) {
|
func (ss *sqLiteSession) store(_, key, value []byte) (exists bool, err error) {
|
||||||
var pos int64
|
var pos int64
|
||||||
if pos, err = ss.backend.decoder(key); err != nil {
|
if pos, err = ss.backend.decoder(key); err != nil {
|
||||||
return
|
return
|
||||||
@ -295,7 +295,7 @@ func (ss *SQLiteSession) store(_, key, value []byte) (exists bool, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) beginTransaction() (err error) {
|
func (ss *sqLiteSession) beginTransaction() (err error) {
|
||||||
if ss.tx != nil {
|
if ss.tx != nil {
|
||||||
log.Println("WARN: Already running transaction.")
|
log.Println("WARN: Already running transaction.")
|
||||||
return nil
|
return nil
|
||||||
@ -307,7 +307,7 @@ func (ss *SQLiteSession) beginTransaction() (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) commitTransaction() error {
|
func (ss *sqLiteSession) commitTransaction() error {
|
||||||
|
|
||||||
tx := ss.tx
|
tx := ss.tx
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
@ -321,7 +321,7 @@ func (ss *SQLiteSession) commitTransaction() error {
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) allKeys(
|
func (ss *sqLiteSession) allKeys(
|
||||||
_ []byte,
|
_ []byte,
|
||||||
done <-chan struct{}) (<-chan []byte, int, error) {
|
done <-chan struct{}) (<-chan []byte, int, error) {
|
||||||
globalLock.RLock()
|
globalLock.RLock()
|
||||||
@ -372,7 +372,7 @@ func (ss *SQLiteSession) allKeys(
|
|||||||
return keys, n, nil
|
return keys, n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) spatialQuery(
|
func (ss *sqLiteSession) spatialQuery(
|
||||||
_, first, second []byte,
|
_, first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
@ -383,7 +383,7 @@ func (ss *SQLiteSession) spatialQuery(
|
|||||||
return ss.plainSpatialQuery(first, second, done)
|
return ss.plainSpatialQuery(first, second, done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) interleavedSpatialQuery(
|
func (ss *sqLiteSession) interleavedSpatialQuery(
|
||||||
first, second []byte,
|
first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
@ -463,7 +463,7 @@ func (ss *SQLiteSession) interleavedSpatialQuery(
|
|||||||
return blocks, nil
|
return blocks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLiteSession) plainSpatialQuery(
|
func (ss *sqLiteSession) plainSpatialQuery(
|
||||||
first, second []byte,
|
first, second []byte,
|
||||||
done <-chan struct{}) (<-chan block, error) {
|
done <-chan struct{}) (<-chan block, error) {
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user