Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Segment/Consumer/ForkCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Segment_Consumer_ForkCurl extends Segment_QueueConsumer {
* @param array $options
* boolean "debug" - whether to use debug output, wait for response.
* number "max_queue_size" - the max size of messages to enqueue
* number "batch_size" - how many messages to send in a single request
* number "flush_at" - how many messages to send in a single request
*/
public function __construct($secret, $options = array()) {
parent::__construct($secret, $options);
Expand Down
2 changes: 1 addition & 1 deletion lib/Segment/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Segment_Consumer_LibCurl extends Segment_QueueConsumer {
* @param array $options
* boolean "debug" - whether to use debug output, wait for response.
* number "max_queue_size" - the max size of messages to enqueue
* number "batch_size" - how many messages to send in a single request
* number "flush_at" - how many messages to send in a single request
*/
public function __construct($secret, $options = array()) {
parent::__construct($secret, $options);
Expand Down
38 changes: 34 additions & 4 deletions lib/Segment/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ abstract class Segment_QueueConsumer extends Segment_Consumer {
protected $max_queue_size = 1000;
protected $max_queue_size_bytes = 33554432; //32M

protected $batch_size = 100;
protected $flush_at = 100;
protected $max_batch_size_bytes = 512000; //500kb
protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s
protected $host = "";
protected $compress_request = false;

protected $flush_interval_in_mills = 10000; //frequency in milliseconds to send data, default 10

/**
* Store our secret and options as part of this consumer
* @param string $secret
Expand All @@ -26,16 +28,41 @@ public function __construct($secret, $options = array()) {
}

if (isset($options["batch_size"])) {
$this->batch_size = $options["batch_size"];
if($options["batch_size"] < 1) {
$msg = "Batch Size must not be less than 1";
error_log("[Analytics][" . $this->type . "] " . $msg);
} else {
$msg = "WARNING: batch_size option to be depricated soon, please use new option flush_at";
error_log("[Analytics][" . $this->type . "] " . $msg);
$this->flush_at = $options["batch_size"];
}
}

if (isset($options["flush_at"])) {
if($options["flush_at"] < 1) {
$msg = "Flush at Size must not be less than 1";
error_log("[Analytics][" . $this->type . "] " . $msg);
} else {
$this->flush_at = $options["flush_at"];
}
}

if (isset($options["host"])) {
$this->host = $options["host"];
}

if (isset($options["compress_request"])) {
$this->compress_request = json_decode($options["compress_request"]);
}

if (isset($options["flush_interval"])) {
if($options["flush_interval"] < 1000) {
$msg = "Flush interval must not be less than 1 second";
error_log("[Analytics][" . $this->type . "] " . $msg);
} else {
$this->flush_interval_in_mills = $options["flush_interval"];
}
}

$this->queue = array();
}
Expand Down Expand Up @@ -113,7 +140,8 @@ public function flush() {
$success = true;

while ($count > 0 && $success) {
$batch = array_splice($this->queue, 0, min($this->batch_size, $count));

$batch = array_splice($this->queue, 0, min($this->flush_at, $count));

if (mb_strlen(serialize((array)$this->queue), '8bit') >= $this->max_batch_size_bytes) {
$msg = "Batch size is larger than 500KB";
Expand All @@ -125,6 +153,8 @@ public function flush() {
$success = $this->flushBatch($batch);

$count = count($this->queue);

usleep($this->flush_interval_in_mills * 1000);
}

return $success;
Expand Down Expand Up @@ -152,7 +182,7 @@ protected function enqueue($item) {
$count = array_push($this->queue, $item);


if ($count >= $this->batch_size) {
if ($count >= $this->flush_at) {
return $this->flush(); // return ->flush() result: true on success
}

Expand Down