Skip to content
2 changes: 1 addition & 1 deletion .buildscript/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "$RUN_E2E_TESTS" != "true" ]; then
echo "Skipping end to end tests."
else
echo "Running end to end tests..."
wget https://github.com/segmentio/library-e2e-tester/releases/download/0.2.1/tester_linux_amd64 -O tester
wget https://github.com/segmentio/library-e2e-tester/releases/download/0.4.1-pre1/tester_linux_amd64 -O tester
chmod +x tester
./tester -path='./bin/analytics'
echo "End to end tests completed!"
Expand Down
6 changes: 2 additions & 4 deletions lib/Segment/Consumer/ForkCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ public function flushBatch($messages) {

// Verify message size is below than 32KB
if (strlen($payload) >= 32 * 1024) {
if ($this->debug()) {
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);
}
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);

return false;
}
Expand Down
6 changes: 2 additions & 4 deletions lib/Segment/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ public function flushBatch($messages) {

// Verify message size is below than 32KB
if (strlen($payload) >= 32 * 1024) {
if ($this->debug()) {
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);
}
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);

return false;
}
Expand Down
6 changes: 2 additions & 4 deletions lib/Segment/Consumer/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ private function createBody($host, $content) {

// Verify message size is below than 32KB
if (strlen($req) >= 32 * 1024) {
if ($this->debug()) {
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);
}
$msg = "Message size is larger than 32KB";
error_log("[Analytics][" . $this->type . "] " . $msg);

return false;
}
Expand Down
19 changes: 19 additions & 0 deletions lib/Segment/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ abstract class Segment_QueueConsumer extends Segment_Consumer {

protected $queue;
protected $max_queue_size = 1000;
protected $max_queue_size_bytes = 33554432; //32M

protected $batch_size = 100;
protected $max_batch_size_bytes = 512000; //500kb
protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s
protected $host = "";
protected $compress_request = false;
Expand Down Expand Up @@ -111,6 +114,14 @@ public function flush() {

while ($count > 0 && $success) {
$batch = array_splice($this->queue, 0, min($this->batch_size, $count));

if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) {
$msg = "Batch size is larger than 500KB";
error_log("[Analytics][" . $this->type . "] " . $msg);

return false;
}

$success = $this->flushBatch($batch);

$count = count($this->queue);
Expand All @@ -131,7 +142,15 @@ protected function enqueue($item) {
return false;
}

if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_queue_size_bytes) {
$msg = "Queue size is larger than 32MB";
error_log("[Analytics][" . $this->type . "] " . $msg);

return false;
}

$count = array_push($this->queue, $item);


if ($count >= $this->batch_size) {
return $this->flush(); // return ->flush() result: true on success
Expand Down