Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ are built by `MessageBuilder` objects, and those messages can be consumed by the
MixpanelAPI mixpanel = new MixpanelAPI();
mixpanel.deliver(delivery);

### Gzip Compression

The library supports gzip compression for both tracking events (`/track`) and importing historical events (`/import`). To enable gzip compression, pass `true` to the `MixpanelAPI` constructor:

MixpanelAPI mixpanel = new MixpanelAPI(true); // Enable gzip compression

Gzip compression can reduce bandwidth usage and improve performance, especially when sending large batches of events.

### Importing Historical Events

The library supports importing historical events (events older than 5 days that are not accepted using /track) via the `/import` endpoint. Project token will be used for basic auth.

Learn More
----------
This library in particular has more in-depth documentation at
Expand Down
75 changes: 68 additions & 7 deletions src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
*
*/
public class MixpanelAPIDemo {


public static String PROJECT_TOKEN = "2d7b8a6e7d5d7d81ff4d988bac0be9a7"; // "YOUR TOKEN";
public static String PROJECT_TOKEN = "bf2a25faaefdeed4aecde6e177d111bf"; // "YOUR TOKEN";
public static long MILLIS_TO_WAIT = 10 * 1000;

private static class DeliveryThread extends Thread {
public DeliveryThread(Queue<JSONObject> messages) {
mMixpanel = new MixpanelAPI();
public DeliveryThread(Queue<JSONObject> messages, boolean useGzipCompression) {
mMixpanel = new MixpanelAPI(useGzipCompression);
mMessageQueue = messages;
mUseGzipCompression = useGzipCompression;
}

@Override
Expand All @@ -40,7 +42,7 @@ public void run() {
do {
message = mMessageQueue.poll();
if (message != null) {
System.out.println("WILL SEND MESSAGE:\n" + message.toString());
System.out.println("WILL SEND MESSAGE" + (mUseGzipCompression ? " (with gzip compression)" : "") + ":\n" + message.toString());

messageCount = messageCount + 1;
delivery.addMessage(message);
Expand All @@ -50,7 +52,7 @@ public void run() {

mMixpanel.deliver(delivery);

System.out.println("Sent " + messageCount + " messages.");
System.out.println("Sent " + messageCount + " messages" + (mUseGzipCompression ? " with gzip compression" : "") + ".");
Thread.sleep(MILLIS_TO_WAIT);
}
} catch (IOException e) {
Expand All @@ -62,13 +64,21 @@ public void run() {

private final MixpanelAPI mMixpanel;
private final Queue<JSONObject> mMessageQueue;
private final boolean mUseGzipCompression;
}

public static void printUsage() {
System.out.println("USAGE: java com.mixpanel.mixpanelapi.demo.MixpanelAPIDemo distinct_id");
System.out.println("");
System.out.println("This is a simple program demonstrating Mixpanel's Java library.");
System.out.println("It reads lines from standard input and sends them to Mixpanel as events.");
System.out.println("");
System.out.println("The demo also shows:");
System.out.println(" - Setting user properties");
System.out.println(" - Tracking charges");
System.out.println(" - Importing historical events");
System.out.println(" - Incrementing user properties");
System.out.println(" - Using gzip compression");
}

/**
Expand All @@ -77,7 +87,12 @@ public static void printUsage() {
public static void main(String[] args)
throws IOException, InterruptedException {
Queue<JSONObject> messages = new ConcurrentLinkedQueue<JSONObject>();
DeliveryThread worker = new DeliveryThread(messages);
Queue<JSONObject> messagesWithGzip = new ConcurrentLinkedQueue<JSONObject>();

// Create two delivery threads - one without gzip and one with gzip compression
DeliveryThread worker = new DeliveryThread(messages, false);
DeliveryThread workerWithGzip = new DeliveryThread(messagesWithGzip, true);

MessageBuilder messageBuilder = new MessageBuilder(PROJECT_TOKEN);

if (args.length != 1) {
Expand All @@ -86,6 +101,8 @@ public static void main(String[] args)
}

worker.start();
workerWithGzip.start();

String distinctId = args[0];
BufferedReader inputLines = new BufferedReader(new InputStreamReader(System.in));
String line = inputLines.readLine();
Expand All @@ -101,6 +118,49 @@ public static void main(String[] args)
JSONObject transactionMessage = messageBuilder.trackCharge(distinctId, 2.50, null);
messages.add(transactionMessage);

// Import a historical event (30 days ago) with explicit time and $insert_id
long thirtyDaysAgo = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L);
Map<String, Object> importPropsMap = new HashMap<String, Object>();
importPropsMap.put("time", thirtyDaysAgo);
importPropsMap.put("$insert_id", "demo-import-" + System.currentTimeMillis());
importPropsMap.put("Event Type", "Historical");
importPropsMap.put("Source", "Demo Import");
JSONObject importProps = new JSONObject(importPropsMap);
JSONObject importMessage = messageBuilder.importEvent(distinctId, "Program Started", importProps);
messages.add(importMessage);

// Import another event using defaults (time and $insert_id auto-generated)
Map<String, String> simpleImportProps = new HashMap<String, String>();
simpleImportProps.put("Source", "Demo Simple Import");
JSONObject simpleImportMessage = messageBuilder.importEvent(distinctId, "Simple Import Event", new JSONObject(simpleImportProps));
messages.add(simpleImportMessage);

// Import event with no properties at all (time and $insert_id both auto-generated)
JSONObject minimalImportMessage = messageBuilder.importEvent(distinctId, "Minimal Import Event", null);
messages.add(minimalImportMessage);

// Demonstrate gzip compression by sending some messages with compression enabled
System.out.println("\n=== Demonstrating gzip compression ===");

// Send a regular event with gzip compression
Map<String, String> gzipEventProps = new HashMap<String, String>();
gzipEventProps.put("Compression", "gzip");
gzipEventProps.put("Demo", "true");
JSONObject gzipEvent = messageBuilder.event(distinctId, "Gzip Compressed Event", new JSONObject(gzipEventProps));
messagesWithGzip.add(gzipEvent);

// Send an import event with gzip compression
long historicalTime = System.currentTimeMillis() - (60L * 24L * 60L * 60L * 1000L);
Map<String, Object> gzipImportProps = new HashMap<String, Object>();
gzipImportProps.put("time", historicalTime);
gzipImportProps.put("$insert_id", "gzip-import-" + System.currentTimeMillis());
gzipImportProps.put("Compression", "gzip");
gzipImportProps.put("Event Type", "Historical with Gzip");
JSONObject gzipImportEvent = messageBuilder.importEvent(distinctId, "Gzip Compressed Import", new JSONObject(gzipImportProps));
messagesWithGzip.add(gzipImportEvent);

System.out.println("Added events to gzip compression queue\n");

while((line != null) && (line.length() > 0)) {
System.out.println("SENDING LINE: " + line);
Map<String, String> propMap = new HashMap<String, String>();
Expand All @@ -117,10 +177,11 @@ public static void main(String[] args)
line = inputLines.readLine();
}

while(! messages.isEmpty()) {
while(! messages.isEmpty() || ! messagesWithGzip.isEmpty()) {
Thread.sleep(1000);
}

worker.interrupt();
workerWithGzip.interrupt();
}
}
10 changes: 9 additions & 1 deletion src/main/java/com/mixpanel/mixpanelapi/ClientDelivery.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class ClientDelivery {
private final List<JSONObject> mEventsMessages = new ArrayList<JSONObject>();
private final List<JSONObject> mPeopleMessages = new ArrayList<JSONObject>();
private final List<JSONObject> mGroupMessages = new ArrayList<JSONObject>();
private final List<JSONObject> mImportMessages = new ArrayList<JSONObject>();

/**
* Adds an individual message to this delivery. Messages to Mixpanel are often more efficient when sent in batches.
Expand Down Expand Up @@ -41,6 +42,9 @@ else if (messageType.equals("people")) {
else if (messageType.equals("group")) {
mGroupMessages.add(messageContent);
}
else if (messageType.equals("import")) {
mImportMessages.add(messageContent);
}
} catch (JSONException e) {
throw new RuntimeException("Apparently valid mixpanel message could not be interpreted.", e);
}
Expand All @@ -63,7 +67,7 @@ public boolean isValidMessage(JSONObject message) {
if (messageContents == null) {
ret = false;
}
else if (!messageType.equals("event") && !messageType.equals("people") && !messageType.equals("group")) {
else if (!messageType.equals("event") && !messageType.equals("people") && !messageType.equals("group") && !messageType.equals("import")) {
ret = false;
}
}
Expand All @@ -86,4 +90,8 @@ else if (!messageType.equals("event") && !messageType.equals("people") && !messa
return mGroupMessages;
}

/* package */ List<JSONObject> getImportMessages() {
return mImportMessages;
}

}
1 change: 1 addition & 0 deletions src/main/java/com/mixpanel/mixpanelapi/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
/* package */ class Config {
public static final String BASE_ENDPOINT = "https://api.mixpanel.com";
public static final int MAX_MESSAGE_SIZE = 50;
public static final int IMPORT_MAX_MESSAGE_SIZE = 2000;
}
63 changes: 63 additions & 0 deletions src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;

import org.json.JSONArray;
import org.json.JSONException;
Expand Down Expand Up @@ -83,6 +84,68 @@ public JSONObject event(String distinctId, String eventName, JSONObject properti
}
}

/**
* Creates a message for importing historical events (events older than 5 days) to Mixpanel via the /import endpoint.
* This method is similar to event(), but is designed for the import endpoint which requires:
* - A custom timestamp (defaults to current time if not provided)
* - An insert_id for deduplication (auto-generated if not provided)
* - Basic authentication using the project token
*
* See:
* https://developer.mixpanel.com/reference/import-events
*
* @param distinctId a string uniquely identifying the individual cause associated with this event
* @param eventName a human readable name for the event, for example "Purchase", or "Threw Exception"
* @param properties a JSONObject associating properties with the event. Optional properties:
* - "time": timestamp in milliseconds since epoch (defaults to current time)
* - "$insert_id": unique identifier for deduplication (auto-generated if not provided)
* @return import event message for consumption by MixpanelAPI
*/
public JSONObject importEvent(String distinctId, String eventName, JSONObject properties) {
long time = System.currentTimeMillis();

// Nothing below should EVER throw a JSONException.
try {
JSONObject dataObj = new JSONObject();
dataObj.put("event", eventName);

JSONObject propertiesObj = null;
if (properties == null) {
propertiesObj = new JSONObject();
}
else {
propertiesObj = new JSONObject(properties.toString());
}
// no need to add $import true property as this is added by the backend for any event imported.
if (! propertiesObj.has("token")) propertiesObj.put("token", mToken);

// Set default time to current time if not provided
if (! propertiesObj.has("time")) propertiesObj.put("time", time);

// Generate default $insert_id if not provided (to prevent duplicates)
// Uses UUID v4 (random) in hex format, matching Python SDK implementation
if (! propertiesObj.has("$insert_id")) {
String insertId = UUID.randomUUID().toString().replace("-", "");
propertiesObj.put("$insert_id", insertId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a known/recommended solution for generating $insert_ids?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs for import recommend a UUID or computing a hash of some set of properties.

For consistency with Python SDK import we can use UUIDv4.

Fixed here: 59fac6a

}

if (! propertiesObj.has("mp_lib")) propertiesObj.put("mp_lib", "jdk");

if (distinctId != null)
propertiesObj.put("distinct_id", distinctId);

dataObj.put("properties", propertiesObj);

JSONObject envelope = new JSONObject();
envelope.put("envelope_version", 1);
envelope.put("message_type", "import");
envelope.put("message", dataObj);
return envelope;
} catch (JSONException e) {
throw new RuntimeException("Can't construct a Mixpanel import message", e);
}
}

/**
* Sets a property on the profile associated with the given distinctId. When
* sent, this message will overwrite any existing values for the given
Expand Down
Loading