diff --git a/.buildscript/e2e.sh b/.buildscript/e2e.sh index ed25afe..5ccdab4 100755 --- a/.buildscript/e2e.sh +++ b/.buildscript/e2e.sh @@ -6,7 +6,7 @@ if [ "$RUN_E2E_TESTS" != "true" ]; then echo "Skipping end to end tests." else echo "Running end to end tests..." - wget https://github.com/segmentio/library-e2e-tester/releases/download/0.2.1/tester_linux_amd64 -O tester + wget https://github.com/segmentio/library-e2e-tester/releases/download/0.4.1-pre1/tester_linux_amd64 -O tester chmod +x tester ./tester -path='./bin/analytics' echo "End to end tests completed!" diff --git a/lib/Segment/Consumer/ForkCurl.php b/lib/Segment/Consumer/ForkCurl.php index 6f73a00..c7b1f9e 100644 --- a/lib/Segment/Consumer/ForkCurl.php +++ b/lib/Segment/Consumer/ForkCurl.php @@ -69,10 +69,8 @@ public function flushBatch($messages) { // Verify message size is below than 32KB if (strlen($payload) >= 32 * 1024) { - if ($this->debug()) { - $msg = "Message size is larger than 32KB"; - error_log("[Analytics][" . $this->type . "] " . $msg); - } + $msg = "Message size is larger than 32KB"; + error_log("[Analytics][" . $this->type . "] " . $msg); return false; } diff --git a/lib/Segment/Consumer/LibCurl.php b/lib/Segment/Consumer/LibCurl.php index 842a803..052f4bb 100644 --- a/lib/Segment/Consumer/LibCurl.php +++ b/lib/Segment/Consumer/LibCurl.php @@ -34,10 +34,8 @@ public function flushBatch($messages) { // Verify message size is below than 32KB if (strlen($payload) >= 32 * 1024) { - if ($this->debug()) { - $msg = "Message size is larger than 32KB"; - error_log("[Analytics][" . $this->type . "] " . $msg); - } + $msg = "Message size is larger than 32KB"; + error_log("[Analytics][" . $this->type . "] " . $msg); return false; } diff --git a/lib/Segment/Consumer/Socket.php b/lib/Segment/Consumer/Socket.php index 2bec79b..ae58e0f 100644 --- a/lib/Segment/Consumer/Socket.php +++ b/lib/Segment/Consumer/Socket.php @@ -193,10 +193,8 @@ private function createBody($host, $content) { // Verify message size is below than 32KB if (strlen($req) >= 32 * 1024) { - if ($this->debug()) { - $msg = "Message size is larger than 32KB"; - error_log("[Analytics][" . $this->type . "] " . $msg); - } + $msg = "Message size is larger than 32KB"; + error_log("[Analytics][" . $this->type . "] " . $msg); return false; } diff --git a/lib/Segment/QueueConsumer.php b/lib/Segment/QueueConsumer.php index 682950f..062d3a1 100644 --- a/lib/Segment/QueueConsumer.php +++ b/lib/Segment/QueueConsumer.php @@ -5,7 +5,10 @@ abstract class Segment_QueueConsumer extends Segment_Consumer { protected $queue; protected $max_queue_size = 1000; + protected $max_queue_size_bytes = 33554432; //32M + protected $batch_size = 100; + protected $max_batch_size_bytes = 512000; //500kb protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s protected $host = ""; protected $compress_request = false; @@ -111,6 +114,14 @@ public function flush() { while ($count > 0 && $success) { $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); + + if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) { + $msg = "Batch size is larger than 500KB"; + error_log("[Analytics][" . $this->type . "] " . $msg); + + return false; + } + $success = $this->flushBatch($batch); $count = count($this->queue); @@ -131,7 +142,15 @@ protected function enqueue($item) { return false; } + if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_queue_size_bytes) { + $msg = "Queue size is larger than 32MB"; + error_log("[Analytics][" . $this->type . "] " . $msg); + + return false; + } + $count = array_push($this->queue, $item); + if ($count >= $this->batch_size) { return $this->flush(); // return ->flush() result: true on success