mtsatellite/cmd/mtredisalize/main.go

170 lines
3.9 KiB
Go
Raw Normal View History

// Copyright 2014, 2015 by Sascha L. Teichmann
2014-08-03 15:59:56 +02:00
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
2014-08-03 00:01:14 +02:00
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"runtime"
"time"
"bitbucket.org/s_l_teichmann/mtsatellite/common"
2014-08-03 00:01:14 +02:00
)
const (
defaultMaxBulkStringSize = 32 * 1024 * 1024
defaultGCDuration = "24h"
defaultChangeDuration = "30s"
)
2014-08-06 10:04:24 +02:00
2014-09-07 15:02:39 +02:00
func usage() {
fmt.Fprintf(os.Stderr,
"Usage: %s [<options>] <database>\n", os.Args[0])
fmt.Fprintln(os.Stderr, "Options:")
flag.PrintDefaults()
}
2014-08-03 00:01:14 +02:00
func main() {
var (
port int
host string
driver string
cacheSize int
version bool
interleaved bool
2015-05-27 16:57:08 +02:00
changeURL string
gcDuration string
changeDuration string
maxBulkStringSize int64
)
2014-08-03 00:01:14 +02:00
2014-09-07 15:02:39 +02:00
flag.Usage = usage
2014-08-03 00:01:14 +02:00
flag.IntVar(&port, "port", 6379, "port to bind")
2014-08-03 14:52:24 +02:00
flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)")
2014-08-03 00:01:14 +02:00
flag.StringVar(&host, "host", "", "host to bind")
flag.IntVar(&cacheSize, "cache", 32, "cache size in MB")
2014-08-06 10:04:24 +02:00
flag.BoolVar(&version, "version", false, "Print version and exit.")
flag.BoolVar(&interleaved,
"interleaved", false, "Backend stores key in interleaved form.")
flag.StringVar(&gcDuration,
"gc-duration", defaultGCDuration, "Duration between forced GCs.")
flag.StringVar(&changeDuration,
"change-duration", defaultChangeDuration, "Duration to aggregate changes.")
2015-05-27 16:57:08 +02:00
flag.StringVar(&changeURL, "change-url", "", "URL to send changes to.")
flag.Int64Var(&maxBulkStringSize, "max-bulk-string-size", defaultMaxBulkStringSize,
"max size of a bulk string to be accepted as input (in bytes).")
2014-08-03 00:01:14 +02:00
flag.Parse()
2014-08-06 10:04:24 +02:00
if version {
common.PrintVersionAndExit()
2014-08-06 10:04:24 +02:00
}
if flag.NArg() < 1 {
2014-08-03 00:01:14 +02:00
log.Fatal("Missing path to world")
}
var (
err error
backend Backend
gcDur time.Duration
chDur time.Duration
changeTracker *changeTracker
)
if gcDur, err = time.ParseDuration(gcDuration); err != nil {
log.Fatal(err)
}
2014-08-03 00:01:14 +02:00
// Setup the change listening stuff.
var changeChan <-chan time.Time
2015-05-27 16:57:08 +02:00
useChangeNotification := changeURL != ""
if useChangeNotification {
if chDur, err = time.ParseDuration(changeDuration); err != nil {
log.Fatal(err)
}
changeChan = time.Tick(chDur)
changeTracker = newChangeTracker()
} else {
// We will never receive ticks on this.
changeChan = make(<-chan time.Time)
}
path := flag.Arg(0)
2014-08-03 15:42:12 +02:00
if driver == "sqlite" {
if backend, err = NewSqliteBackend(path, changeTracker, interleaved); err != nil {
2014-08-03 15:42:12 +02:00
log.Fatal(err)
}
} else {
if backend, err = NewLeveDBBackend(
path, changeTracker, interleaved, cacheSize); err != nil {
2014-08-03 15:42:12 +02:00
log.Fatal(err)
}
}
defer backend.Shutdown()
2014-08-03 00:01:14 +02:00
var listener net.Listener
listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
log.Fatal(err)
}
defer listener.Close()
log.Printf("Server started at %s\n", listener.Addr())
2014-08-03 00:01:14 +02:00
connChan := make(chan net.Conn)
defer close(connChan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, os.Kill)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
log.Printf("Client accepted from: %s\n", conn.RemoteAddr())
2014-08-03 00:01:14 +02:00
connChan <- conn
}
}()
log.Printf("Doing garbage collection every: %s\n", gcDur)
gcChan := time.Tick(gcDur)
2014-08-03 00:01:14 +02:00
for {
select {
case conn := <-connChan:
var session Session
if session, err = backend.NewSession(); err != nil {
log.Printf("Cannot create session: %s\n", err)
conn.Close()
} else {
go NewConnection(conn, session, maxBulkStringSize).Run()
}
2014-08-03 00:01:14 +02:00
case <-sigChan:
log.Println("Shutting down")
return
case <-gcChan:
log.Println("Starting garbage collection.")
runtime.GC()
log.Println("Garbage collection done.")
case <-changeChan:
if changeTracker != nil {
2015-05-27 16:57:08 +02:00
changeTracker.FlushChanges(changeURL)
}
2014-08-03 00:01:14 +02:00
}
}
}