// Copyright 2014 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "flag" "fmt" "log" "os" "sync" "bitbucket.org/s_l_teichmann/mtsatellite/common" ) func usage() { fmt.Fprintf(os.Stderr, "Usage: %s [] \n", os.Args[0]) fmt.Fprintln(os.Stderr, "Options:") flag.PrintDefaults() } func selectKeySplitter(interleaved bool) common.KeySplitter { if interleaved { return common.InterleavedToCoord } return common.PlainToCoord } func selectKeyJoiner(interleaved bool) common.KeyJoiner { if interleaved { return common.CoordToInterleaved } return common.CoordToPlain } func selectKeyDecoder(interleaved bool) common.KeyDecoder { if interleaved { return common.DecodeFromBigEndian } return common.DecodeStringFromBytes } func selectKeyEncoder(interleaved bool) common.KeyEncoder { if interleaved { return common.EncodeToBigEndian } return common.EncodeStringToBytes } func copyProducerToConsumer(producer common.BlockProducer, consumer common.BlockConsumer) error { blocks := make(chan *common.Block) done := make(chan struct{}) defer close(done) pool := sync.Pool{New: func() interface{} { return new(common.Block) }} go func() { defer close(blocks) for { block := pool.Get().(*common.Block) if err := producer.Next(block); err != nil { if err != common.ErrNoMoreBlocks { log.Printf("Reading failed: %s\n", err) } return } select { case blocks <- block: case <-done: return } } }() i := 0 for block := range blocks { if err := consumer.Consume(block); err != nil { return err } block.Data = nil pool.Put(block) i++ if i%1000 == 0 { log.Printf("%d blocks transferred.\n", i) } } log.Printf("%d blocks transferred in total.\n", i) return nil } func main() { var ( srcBackend string dstBackend string srcInterleaved bool dstInterleaved bool ) flag.Usage = usage flag.StringVar(&srcBackend, "source-backend", "sqlite", "type of source database (leveldb, sqlite)") flag.StringVar(&srcBackend, "sb", "sqlite", "type of source database (leveldb, sqlite). Shorthand") flag.StringVar(&dstBackend, "dest-backend", "leveldb", "type of destination database (leveldb, sqlite)") flag.StringVar(&dstBackend, "db", "leveldb", "type of destination database (leveldb, sqlite). Shorthand") flag.BoolVar(&srcInterleaved, "source-interleaved", false, "Is source database interleaved?") flag.BoolVar(&srcInterleaved, "si", false, "Is source database interleaved? Shorthand") flag.BoolVar(&dstInterleaved, "dest-interleaved", true, "Should dest database be interleaved?") flag.BoolVar(&dstInterleaved, "di", true, "Should source database be interleaved? Shorthand") flag.Parse() if flag.NArg() < 2 { log.Fatal("Missing source and/or destination database.") } var ( producer common.BlockProducer consumer common.BlockConsumer err error ) if srcBackend == "sqlite" { if producer, err = NewSQLiteBlockProducer( flag.Arg(0), selectKeySplitter(srcInterleaved)); err != nil { log.Fatalf("Cannot open '%s': %s", flag.Arg(0), err) } } else { // LevelDB if producer, err = NewLevelDBBlockProducer( flag.Arg(0), selectKeySplitter(srcInterleaved), selectKeyDecoder(srcInterleaved)); err != nil { log.Fatalf("Cannot open '%s': %s", flag.Arg(0), err) } } defer producer.Close() if dstBackend == "sqlite" { if consumer, err = NewSQLiteBlockConsumer( flag.Arg(1), selectKeyJoiner(dstInterleaved)); err != nil { log.Fatalf("Cannot open '%s': %s", flag.Arg(1), err) } } else { // LevelDB if consumer, err = NewLevelDBBlockConsumer( flag.Arg(1), selectKeyJoiner(dstInterleaved), selectKeyEncoder(dstInterleaved)); err != nil { log.Fatalf("Cannot open '%s': %s", flag.Arg(1), err) } } defer consumer.Close() if err = copyProducerToConsumer(producer, consumer); err != nil { log.Fatalf("Database transfer failed: %s\n", err) } }