From 887deb912adebef7ef4da1f39cff267ce5d2a22c Mon Sep 17 00:00:00 2001 From: "Shane L. Duvall" Date: Tue, 17 Nov 2020 12:14:14 -0600 Subject: [PATCH 1/2] Issue #135: add default flush_interval option in constructor and allow it to be user configurable with 1 second limit and add error message if set lower that 1. add flush_at option and refactor batch_size option; add configurable limit to be no less that 1 and add error message if set lower that 1; also warn user of batch_size future deprecation --- lib/Segment/Consumer/ForkCurl.php | 2 +- lib/Segment/Consumer/LibCurl.php | 2 +- lib/Segment/QueueConsumer.php | 36 ++++++++++++++++++++++++++++--- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/lib/Segment/Consumer/ForkCurl.php b/lib/Segment/Consumer/ForkCurl.php index c7b1f9e..e837ead 100644 --- a/lib/Segment/Consumer/ForkCurl.php +++ b/lib/Segment/Consumer/ForkCurl.php @@ -10,7 +10,7 @@ class Segment_Consumer_ForkCurl extends Segment_QueueConsumer { * @param array $options * boolean "debug" - whether to use debug output, wait for response. * number "max_queue_size" - the max size of messages to enqueue - * number "batch_size" - how many messages to send in a single request + * number "flush_at" - how many messages to send in a single request */ public function __construct($secret, $options = array()) { parent::__construct($secret, $options); diff --git a/lib/Segment/Consumer/LibCurl.php b/lib/Segment/Consumer/LibCurl.php index 052f4bb..09e9500 100644 --- a/lib/Segment/Consumer/LibCurl.php +++ b/lib/Segment/Consumer/LibCurl.php @@ -9,7 +9,7 @@ class Segment_Consumer_LibCurl extends Segment_QueueConsumer { * @param array $options * boolean "debug" - whether to use debug output, wait for response. * number "max_queue_size" - the max size of messages to enqueue - * number "batch_size" - how many messages to send in a single request + * number "flush_at" - how many messages to send in a single request */ public function __construct($secret, $options = array()) { parent::__construct($secret, $options); diff --git a/lib/Segment/QueueConsumer.php b/lib/Segment/QueueConsumer.php index 062d3a1..9125042 100644 --- a/lib/Segment/QueueConsumer.php +++ b/lib/Segment/QueueConsumer.php @@ -7,12 +7,14 @@ abstract class Segment_QueueConsumer extends Segment_Consumer { protected $max_queue_size = 1000; protected $max_queue_size_bytes = 33554432; //32M - protected $batch_size = 100; + protected $flush_at = 100; protected $max_batch_size_bytes = 512000; //500kb protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s protected $host = ""; protected $compress_request = false; + protected $flush_interval_in_mills = 10000; //frequency in milliseconds to send data, default 10 + /** * Store our secret and options as part of this consumer * @param string $secret @@ -26,9 +28,25 @@ public function __construct($secret, $options = array()) { } if (isset($options["batch_size"])) { - $this->batch_size = $options["batch_size"]; + if($options["batch_size"] < 1) { + $msg = "Batch Size must not be less than 1"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $msg = "WARNING: batch_size option to be depricated soon, please use new option flush_at"; + error_log("[Analytics][" . $this->type . "] " . $msg); + $this->flush_at = $options["batch_size"]; + } } + if (isset($options["flush_at"])) { + if($options["flush_at"] < 1) { + $msg = "Flush at Size must not be less than 1"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $this->flush_at = $options["flush_at"]; + } + } + if (isset($options["host"])) { $this->host = $options["host"]; } @@ -36,6 +54,15 @@ public function __construct($secret, $options = array()) { if (isset($options["compress_request"])) { $this->compress_request = json_decode($options["compress_request"]); } + + if (isset($options["flush_interval"])) { + if($options["flush_interval"] < 1000) { + $msg = "Flush interval must not be less than 1 second"; + error_log("[Analytics][" . $this->type . "] " . $msg); + } else { + $this->flush_interval_in_mills = $options["flush_interval"]; + } + } $this->queue = array(); } @@ -113,6 +140,7 @@ public function flush() { $success = true; while ($count > 0 && $success) { + $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) { @@ -125,6 +153,8 @@ public function flush() { $success = $this->flushBatch($batch); $count = count($this->queue); + + usleep($this->flush_interval_in_mills * 1000); } return $success; @@ -152,7 +182,7 @@ protected function enqueue($item) { $count = array_push($this->queue, $item); - if ($count >= $this->batch_size) { + if ($count >= $this->flush_at) { return $this->flush(); // return ->flush() result: true on success } From 96ee294cedaf6094f9135880cf62adb2c3fd36fd Mon Sep 17 00:00:00 2001 From: "Shane L. Duvall" Date: Tue, 17 Nov 2020 14:54:17 -0600 Subject: [PATCH 2/2] update missed batch_size call --- lib/Segment/QueueConsumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Segment/QueueConsumer.php b/lib/Segment/QueueConsumer.php index 9125042..5008de0 100644 --- a/lib/Segment/QueueConsumer.php +++ b/lib/Segment/QueueConsumer.php @@ -141,7 +141,7 @@ public function flush() { while ($count > 0 && $success) { - $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); + $batch = array_splice($this->queue, 0, min($this->flush_at, $count)); if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) { $msg = "Batch size is larger than 500KB";