// Copyright 2014, 2015 by Sascha L. Teichmann // Use of this source code is governed by the MIT license // that can be found in the LICENSE file. package main import ( "flag" "fmt" "log" "net" "os" "os/signal" "runtime" "strings" "syscall" "time" "bitbucket.org/s_l_teichmann/mtsatellite/common" ) const ( defaultMaxBulkStringSize = 32 * 1024 * 1024 defaultGCDuration = "24h" defaultChangeDuration = "30s" ) func usage() { fmt.Fprintf(os.Stderr, "Usage: %s [] \n", os.Args[0]) fmt.Fprintln(os.Stderr, "Options:") flag.PrintDefaults() } func main() { var ( port int host string driver string cacheSize int version bool interleaved bool changeURL string gcDuration string changeDuration string maxBulkStringSize int64 ) flag.Usage = usage flag.IntVar(&port, "port", 6379, "port to bind") flag.StringVar(&driver, "driver", "leveldb", "type of database (leveldb, sqlite)") flag.StringVar(&host, "host", "", "host to bind") flag.IntVar(&cacheSize, "cache", 32, "cache size in MB") flag.BoolVar(&version, "version", false, "Print version and exit.") flag.BoolVar(&interleaved, "interleaved", false, "Backend stores key in interleaved form.") flag.StringVar(&gcDuration, "gc-duration", defaultGCDuration, "Duration between forced GCs.") flag.StringVar(&changeDuration, "change-duration", defaultChangeDuration, "Duration to aggregate changes.") flag.StringVar(&changeURL, "change-url", "", "URL to send changes to.") flag.Int64Var(&maxBulkStringSize, "max-bulk-string-size", defaultMaxBulkStringSize, "max size of a bulk string to be accepted as input (in bytes).") flag.Parse() if version { common.PrintVersionAndExit() } if flag.NArg() < 1 { log.Fatal("Missing path to world") } var ( err error backend Backend gcDur time.Duration chDur time.Duration changeTracker *changeTracker ) if gcDur, err = time.ParseDuration(gcDuration); err != nil { log.Fatal(err) } // Setup the change listening stuff. var changeChan <-chan time.Time useChangeNotification := changeURL != "" if useChangeNotification { if chDur, err = time.ParseDuration(changeDuration); err != nil { log.Fatal(err) } changeChan = time.Tick(chDur) changeTracker = newChangeTracker() } else { // We will never receive ticks on this. changeChan = make(<-chan time.Time) } path := flag.Arg(0) if driver == "sqlite" { if backend, err = NewSQLiteBackend(path, changeTracker, interleaved); err != nil { log.Fatal(err) } } else { if backend, err = NewLeveDBBackend( path, changeTracker, interleaved, cacheSize); err != nil { log.Fatal(err) } } defer backend.Shutdown() var listener net.Listener var proto, address string if strings.ContainsRune(host, '/') { proto, address = "unix", host } else { proto, address = "tcp", fmt.Sprintf("%s:%d", host, port) } listener, err = net.Listen(proto, address) if err != nil { log.Fatal(err) } defer listener.Close() log.Printf("Server started at %s\n", listener.Addr()) connChan := make(chan net.Conn) defer close(connChan) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) go func() { for { conn, err := listener.Accept() if err != nil { log.Fatal(err) } log.Printf("Client accepted from: %s\n", conn.RemoteAddr()) connChan <- conn } }() log.Printf("Doing garbage collection every: %s\n", gcDur) gcChan := time.Tick(gcDur) for { select { case conn := <-connChan: var session Session if session, err = backend.NewSession(); err != nil { log.Printf("Cannot create session: %s\n", err) conn.Close() } else { go NewConnection(conn, session, maxBulkStringSize).Run() } case <-sigChan: log.Println("Shutting down") return case <-gcChan: log.Println("Starting garbage collection.") runtime.GC() log.Println("Garbage collection done.") case <-changeChan: if changeTracker != nil { changeTracker.FlushChanges(changeURL) } } } }