mirror of
				https://bitbucket.org/s_l_teichmann/mtsatellite
				synced 2025-10-31 08:05:27 +01:00 
			
		
		
		
	Made maxBulkStringSize a command line option 'max-bulk-string-size'. It defaults to 32MB.
This commit is contained in:
		| @@ -21,16 +21,18 @@ var ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| type Connection struct { | type Connection struct { | ||||||
| 	conn      net.Conn | 	conn              net.Conn | ||||||
| 	session   Session | 	session           Session | ||||||
| 	boolArray []bool | 	maxBulkStringSize int64 | ||||||
|  | 	boolArray         []bool | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewConnection(conn net.Conn, session Session) *Connection { | func NewConnection(conn net.Conn, session Session, maxBulkStringSize int64) *Connection { | ||||||
| 	return &Connection{ | 	return &Connection{ | ||||||
| 		conn:      conn, | 		conn:              conn, | ||||||
| 		session:   session, | 		session:           session, | ||||||
| 		boolArray: []bool{}} | 		maxBulkStringSize: maxBulkStringSize, | ||||||
|  | 		boolArray:         []bool{}} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Connection) Run() { | func (c *Connection) Run() { | ||||||
| @@ -40,7 +42,7 @@ func (c *Connection) Run() { | |||||||
| 	}() | 	}() | ||||||
| 	rce := NewRedisCommandExecutor(c) | 	rce := NewRedisCommandExecutor(c) | ||||||
| 	r := bufio.NewReaderSize(c.conn, 8*1024) | 	r := bufio.NewReaderSize(c.conn, 8*1024) | ||||||
| 	parser := NewRedisParser(r, rce) | 	parser := NewRedisParser(r, rce, c.maxBulkStringSize) | ||||||
| 	parser.Parse() | 	parser.Parse() | ||||||
| 	log.Println("client disconnected") | 	log.Println("client disconnected") | ||||||
| } | } | ||||||
|   | |||||||
| @@ -16,9 +16,10 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	Version        = "0.3" | 	defaultMaxBulkStringSize = 32 * 1024 * 1024 | ||||||
| 	GCDuration     = "24h" | 	Version                  = "0.3" | ||||||
| 	ChangeDuration = "30s" | 	GCDuration               = "24h" | ||||||
|  | 	ChangeDuration           = "30s" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func usage() { | func usage() { | ||||||
| @@ -31,15 +32,16 @@ func usage() { | |||||||
| func main() { | func main() { | ||||||
|  |  | ||||||
| 	var ( | 	var ( | ||||||
| 		port           int | 		port              int | ||||||
| 		host           string | 		host              string | ||||||
| 		driver         string | 		driver            string | ||||||
| 		cacheSize      int | 		cacheSize         int | ||||||
| 		version        bool | 		version           bool | ||||||
| 		interleaved    bool | 		interleaved       bool | ||||||
| 		changeUrl      string | 		changeUrl         string | ||||||
| 		gcDuration     string | 		gcDuration        string | ||||||
| 		changeDuration string | 		changeDuration    string | ||||||
|  | 		maxBulkStringSize int64 | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	flag.Usage = usage | 	flag.Usage = usage | ||||||
| @@ -56,6 +58,8 @@ func main() { | |||||||
| 	flag.StringVar(&changeDuration, | 	flag.StringVar(&changeDuration, | ||||||
| 		"change-duration", ChangeDuration, "Duration to aggregate changes.") | 		"change-duration", ChangeDuration, "Duration to aggregate changes.") | ||||||
| 	flag.StringVar(&changeUrl, "change-url", "", "URL to send changes to.") | 	flag.StringVar(&changeUrl, "change-url", "", "URL to send changes to.") | ||||||
|  | 	flag.Int64Var(&maxBulkStringSize, "max-bulk-string-size", defaultMaxBulkStringSize, | ||||||
|  | 		"max size of a bulk string to be accepted as input (in bytes).") | ||||||
| 	flag.Parse() | 	flag.Parse() | ||||||
|  |  | ||||||
| 	if version { | 	if version { | ||||||
| @@ -147,7 +151,7 @@ func main() { | |||||||
| 				log.Printf("Cannot create session: %s", err) | 				log.Printf("Cannot create session: %s", err) | ||||||
| 				conn.Close() | 				conn.Close() | ||||||
| 			} else { | 			} else { | ||||||
| 				go NewConnection(conn, session).Run() | 				go NewConnection(conn, session, maxBulkStringSize).Run() | ||||||
| 			} | 			} | ||||||
| 		case <-sigChan: | 		case <-sigChan: | ||||||
| 			log.Println("Shutting down") | 			log.Println("Shutting down") | ||||||
|   | |||||||
| @@ -13,8 +13,6 @@ import ( | |||||||
| 	"strconv" | 	"strconv" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const maxBulkStringSize = 8 * 1024 * 1024 |  | ||||||
|  |  | ||||||
| type RedisConsumer interface { | type RedisConsumer interface { | ||||||
| 	ConsumeInteger(int64) bool | 	ConsumeInteger(int64) bool | ||||||
| 	ConsumeArray(int64) bool | 	ConsumeArray(int64) bool | ||||||
| @@ -24,14 +22,17 @@ type RedisConsumer interface { | |||||||
| } | } | ||||||
|  |  | ||||||
| type RedisParser struct { | type RedisParser struct { | ||||||
| 	reader   *bufio.Reader | 	reader            *bufio.Reader | ||||||
| 	consumer RedisConsumer | 	consumer          RedisConsumer | ||||||
|  | 	maxBulkStringSize int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer) *RedisParser { | func NewRedisParser(reader *bufio.Reader, consumer RedisConsumer, | ||||||
|  | 	maxBulkStringSize int64) *RedisParser { | ||||||
| 	return &RedisParser{ | 	return &RedisParser{ | ||||||
| 		reader:   reader, | 		reader:            reader, | ||||||
| 		consumer: consumer} | 		consumer:          consumer, | ||||||
|  | 		maxBulkStringSize: maxBulkStringSize} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (rp *RedisParser) Parse() { | func (rp *RedisParser) Parse() { | ||||||
| @@ -94,7 +95,7 @@ func (rp *RedisParser) bulkString(line []byte) bool { | |||||||
| 	case i == 0: | 	case i == 0: | ||||||
| 		return rp.consumer.ConsumeBulkString([]byte{}) | 		return rp.consumer.ConsumeBulkString([]byte{}) | ||||||
| 	default: | 	default: | ||||||
| 		if i > maxBulkStringSize { // prevent denial of service. | 		if i > rp.maxBulkStringSize { // prevent denial of service. | ||||||
| 			return rp.consumer.ConsumeError( | 			return rp.consumer.ConsumeError( | ||||||
| 				fmt.Errorf("Bulk string too large (%d bytes).\n", i)) | 				fmt.Errorf("Bulk string too large (%d bytes).\n", i)) | ||||||
| 		} | 		} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user