diff --git a/lib/Segment/Consumer/ForkCurl.php b/lib/Segment/Consumer/ForkCurl.php index c7b1f9e..e837ead 100644 --- a/lib/Segment/Consumer/ForkCurl.php +++ b/lib/Segment/Consumer/ForkCurl.php @@ -10,7 +10,7 @@ class Segment_Consumer_ForkCurl extends Segment_QueueConsumer { * @param array $options * boolean "debug" - whether to use debug output, wait for response. * number "max_queue_size" - the max size of messages to enqueue - * number "batch_size" - how many messages to send in a single request + * number "flush_at" - how many messages to send in a single request */ public function __construct($secret, $options = array()) { parent::__construct($secret, $options); diff --git a/lib/Segment/Consumer/LibCurl.php b/lib/Segment/Consumer/LibCurl.php index 052f4bb..09e9500 100644 --- a/lib/Segment/Consumer/LibCurl.php +++ b/lib/Segment/Consumer/LibCurl.php @@ -9,7 +9,7 @@ class Segment_Consumer_LibCurl extends Segment_QueueConsumer { * @param array $options * boolean "debug" - whether to use debug output, wait for response. * number "max_queue_size" - the max size of messages to enqueue - * number "batch_size" - how many messages to send in a single request + * number "flush_at" - how many messages to send in a single request */ public function __construct($secret, $options = array()) { parent::__construct($secret, $options); diff --git a/lib/Segment/QueueConsumer.php b/lib/Segment/QueueConsumer.php index 062d3a1..5008de0 100644 --- a/lib/Segment/QueueConsumer.php +++ b/lib/Segment/QueueConsumer.php @@ -7,12 +7,14 @@ abstract class Segment_QueueConsumer extends Segment_Consumer { protected $max_queue_size = 1000; protected $max_queue_size_bytes = 33554432; //32M - protected $batch_size = 100; + protected $flush_at = 100; protected $max_batch_size_bytes = 512000; //500kb protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s protected $host = ""; protected $compress_request = false; + protected $flush_interval_in_mills = 10000; //frequency in milliseconds to send data, default 10 + /** * Store our secret and options as part of this consumer * @param string $secret @@ -26,9 +28,25 @@ public function __construct($secret, $options = array()) { } if (isset($options["batch_size"])) { - $this->batch_size = $options["batch_size"]; + if($options["batch_size"] < 1) { + $msg = "Batch Size must not be less than 1"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $msg = "WARNING: batch_size option to be depricated soon, please use new option flush_at"; + error_log("[Analytics][" . $this->type . "] " . $msg); + $this->flush_at = $options["batch_size"]; + } } + if (isset($options["flush_at"])) { + if($options["flush_at"] < 1) { + $msg = "Flush at Size must not be less than 1"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $this->flush_at = $options["flush_at"]; + } + } + if (isset($options["host"])) { $this->host = $options["host"]; } @@ -36,6 +54,15 @@ public function __construct($secret, $options = array()) { if (isset($options["compress_request"])) { $this->compress_request = json_decode($options["compress_request"]); } + + if (isset($options["flush_interval"])) { + if($options["flush_interval"] < 1000) { + $msg = "Flush interval must not be less than 1 second"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $this->flush_interval_in_mills = $options["flush_interval"]; + } + } $this->queue = array(); } @@ -113,7 +140,8 @@ public function flush() { $success = true; while ($count > 0 && $success) { - $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); + + $batch = array_splice($this->queue, 0, min($this->flush_at, $count)); if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) { $msg = "Batch size is larger than 500KB"; @@ -125,6 +153,8 @@ public function flush() { $success = $this->flushBatch($batch); $count = count($this->queue); + + usleep($this->flush_interval_in_mills * 1000); } return $success; @@ -152,7 +182,7 @@ protected function enqueue($item) { $count = array_push($this->queue, $item); - if ($count >= $this->batch_size) { + if ($count >= $this->flush_at) { return $this->flush(); // return ->flush() result: true on success }