package main import ( "bufio" "flag" "fmt" "log" "net" "os" "os/signal" leveldb "github.com/jmhodges/levigo" ) type connection struct { conn net.Conn db *leveldb.DB tx *leveldb.WriteBatch intArray []int } func newConnection(conn net.Conn, db *leveldb.DB) *connection { return &connection{ conn, db, nil, []int{}} } func (c *connection) run() { defer c.conn.Close() rce := NewRedisCommandExecutor(c) r := bufio.NewReader(c.conn) parser := NewRedisParser(r, rce) parser.Parse() } func (c *connection) Hget(hash, key []byte) bool { log.Printf("client requested block: %q", key) var err error var data []byte ro := leveldb.NewReadOptions() defer ro.Close() if data, err = c.db.Get(ro, key); err != nil { log.Printf("Something is wrong with db: %s", err) if err = c.writeError(); err != nil { log.Printf("Send message to client failed: %s", err) return false } } else { if err = c.writeBlock(data); err != nil { log.Printf("Send message to client failed: %s", err) return false } } return true } func (c *connection) Hset(hash, key, block []byte) bool { log.Printf("client wants to store block: %q", key) var err error var exists int if exists, err = c.keyExists(key); err != nil { log.Printf("Something is wrong with db: %s", err) if err = c.writeError(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } if c.tx != nil { c.tx.Put(key, block) c.intArray = append(c.intArray, exists) if err = c.writeQueued(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } wo := leveldb.NewWriteOptions() defer wo.Close() if err = c.db.Put(wo, key, block); err != nil { log.Printf("Something is wrong with db: %s", err) if err = c.writeError(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } if err = c.writeInteger(exists); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } func (c *connection) keyExists(key []byte) (exists int, err error) { ro := leveldb.NewReadOptions() defer ro.Close() var data []byte if data, err = c.db.Get(ro, key); err != nil { return } if data != nil { exists = 1 } else { exists = 0 } return } func (c *connection) Multi() bool { if c.tx != nil { log.Println("WARN: Already running transaction.") } else { c.tx = leveldb.NewWriteBatch() } if err := c.writeOk(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } func (c *connection) Exec() bool { var err error if c.tx == nil { if err = c.writeEmptyArray(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } tx := c.tx c.tx = nil defer tx.Close() arr := c.intArray c.intArray = []int{} wo := leveldb.NewWriteOptions() defer wo.Close() if err = c.db.Write(wo, tx); err != nil { log.Printf("Something went wrong in writing transaction: %s", err) if err = c.writeError(); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } if err = c.writeIntegerArray(arr); err != nil { log.Printf("Writing message to client failed: %s", err) return false } return true } var ( redisOk = []byte("+OK\r\n") redisDbError = []byte("-FAIL\r\n") redisNoSuchBlock = []byte("$-1\r\n") redisCrnl = []byte("\r\n") redisEmptyArray = []byte("*0\r\n") redisQueued = []byte("+QUEUED\r\n") ) func (c *connection) writeError() (err error) { _, err = c.conn.Write(redisDbError) return } func (c *connection) writeEmptyArray() (err error) { _, err = c.conn.Write(redisDbError) return } func (c *connection) writeInteger(v int) (err error) { _, err = c.conn.Write([]byte(fmt.Sprintf(":%d\r\n", v))) return } func (c *connection) writeIntegerArray(arr []int) (err error) { if _, err = c.conn.Write([]byte(fmt.Sprintf("*%d\r\n", len(arr)))); err != nil { return } for x := range arr { if err = c.writeInteger(x); err != nil { return } } return } func (c *connection) writeOk() (err error) { _, err = c.conn.Write(redisOk) return } func (c *connection) writeQueued() (err error) { _, err = c.conn.Write(redisQueued) return } func (c *connection) writeBlock(data []byte) (err error) { con := c.conn if data == nil { _, err = con.Write(redisNoSuchBlock) } else { if _, err = con.Write([]byte(fmt.Sprintf("$%d\r\n", len(data)))); err != nil { return } if _, err = con.Write(data); err != nil { return } _, err = con.Write(redisCrnl) } return } func main() { var port int var host string var cacheSize int flag.IntVar(&port, "port", 6379, "port to bind") flag.StringVar(&host, "host", "", "host to bind") flag.IntVar(&cacheSize, "cache", 32, "cache size in MB") flag.Parse() args := flag.Args() if len(args) < 1 { log.Fatal("Missing path to world") } cache := leveldb.NewLRUCache(cacheSize * 1024 * 1024) defer cache.Close() opts := leveldb.NewOptions() opts.SetCache(cache) opts.SetCreateIfMissing(true) var err error var db *leveldb.DB if db, err = leveldb.Open(args[0], opts); err != nil { log.Fatal(err) } defer db.Close() var listener net.Listener listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { log.Fatal(err) } defer listener.Close() log.Printf("Server started at %s", listener.Addr()) connChan := make(chan net.Conn) defer close(connChan) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, os.Kill) go func() { for { conn, err := listener.Accept() if err != nil { log.Fatal(err) } log.Printf("Client accepted from: %s", conn.RemoteAddr()) connChan <- conn } }() for { select { case conn := <-connChan: go newConnection(conn, db).run() case <-sigChan: log.Println("Shutting down") return } } }