diff --git a/README.md b/README.md index 44542ec..a4d254d 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,18 @@ are built by `MessageBuilder` objects, and those messages can be consumed by the MixpanelAPI mixpanel = new MixpanelAPI(); mixpanel.deliver(delivery); +### Gzip Compression + +The library supports gzip compression for both tracking events (`/track`) and importing historical events (`/import`). To enable gzip compression, pass `true` to the `MixpanelAPI` constructor: + + MixpanelAPI mixpanel = new MixpanelAPI(true); // Enable gzip compression + +Gzip compression can reduce bandwidth usage and improve performance, especially when sending large batches of events. + +### Importing Historical Events + +The library supports importing historical events (events older than 5 days that are not accepted using /track) via the `/import` endpoint. Project token will be used for basic auth. + Learn More ---------- This library in particular has more in-depth documentation at diff --git a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java index 39ae59f..7304834 100644 --- a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java +++ b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java @@ -20,14 +20,16 @@ * */ public class MixpanelAPIDemo { + - public static String PROJECT_TOKEN = "2d7b8a6e7d5d7d81ff4d988bac0be9a7"; // "YOUR TOKEN"; + public static String PROJECT_TOKEN = "bf2a25faaefdeed4aecde6e177d111bf"; // "YOUR TOKEN"; public static long MILLIS_TO_WAIT = 10 * 1000; private static class DeliveryThread extends Thread { - public DeliveryThread(Queue messages) { - mMixpanel = new MixpanelAPI(); + public DeliveryThread(Queue messages, boolean useGzipCompression) { + mMixpanel = new MixpanelAPI(useGzipCompression); mMessageQueue = messages; + mUseGzipCompression = useGzipCompression; } @Override @@ -40,7 +42,7 @@ public void run() { do { message = mMessageQueue.poll(); if (message != null) { - System.out.println("WILL SEND MESSAGE:\n" + message.toString()); + System.out.println("WILL SEND MESSAGE" + (mUseGzipCompression ? " (with gzip compression)" : "") + ":\n" + message.toString()); messageCount = messageCount + 1; delivery.addMessage(message); @@ -50,7 +52,7 @@ public void run() { mMixpanel.deliver(delivery); - System.out.println("Sent " + messageCount + " messages."); + System.out.println("Sent " + messageCount + " messages" + (mUseGzipCompression ? " with gzip compression" : "") + "."); Thread.sleep(MILLIS_TO_WAIT); } } catch (IOException e) { @@ -62,6 +64,7 @@ public void run() { private final MixpanelAPI mMixpanel; private final Queue mMessageQueue; + private final boolean mUseGzipCompression; } public static void printUsage() { @@ -69,6 +72,13 @@ public static void printUsage() { System.out.println(""); System.out.println("This is a simple program demonstrating Mixpanel's Java library."); System.out.println("It reads lines from standard input and sends them to Mixpanel as events."); + System.out.println(""); + System.out.println("The demo also shows:"); + System.out.println(" - Setting user properties"); + System.out.println(" - Tracking charges"); + System.out.println(" - Importing historical events"); + System.out.println(" - Incrementing user properties"); + System.out.println(" - Using gzip compression"); } /** @@ -77,7 +87,12 @@ public static void printUsage() { public static void main(String[] args) throws IOException, InterruptedException { Queue messages = new ConcurrentLinkedQueue(); - DeliveryThread worker = new DeliveryThread(messages); + Queue messagesWithGzip = new ConcurrentLinkedQueue(); + + // Create two delivery threads - one without gzip and one with gzip compression + DeliveryThread worker = new DeliveryThread(messages, false); + DeliveryThread workerWithGzip = new DeliveryThread(messagesWithGzip, true); + MessageBuilder messageBuilder = new MessageBuilder(PROJECT_TOKEN); if (args.length != 1) { @@ -86,6 +101,8 @@ public static void main(String[] args) } worker.start(); + workerWithGzip.start(); + String distinctId = args[0]; BufferedReader inputLines = new BufferedReader(new InputStreamReader(System.in)); String line = inputLines.readLine(); @@ -101,6 +118,49 @@ public static void main(String[] args) JSONObject transactionMessage = messageBuilder.trackCharge(distinctId, 2.50, null); messages.add(transactionMessage); + // Import a historical event (30 days ago) with explicit time and $insert_id + long thirtyDaysAgo = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L); + Map importPropsMap = new HashMap(); + importPropsMap.put("time", thirtyDaysAgo); + importPropsMap.put("$insert_id", "demo-import-" + System.currentTimeMillis()); + importPropsMap.put("Event Type", "Historical"); + importPropsMap.put("Source", "Demo Import"); + JSONObject importProps = new JSONObject(importPropsMap); + JSONObject importMessage = messageBuilder.importEvent(distinctId, "Program Started", importProps); + messages.add(importMessage); + + // Import another event using defaults (time and $insert_id auto-generated) + Map simpleImportProps = new HashMap(); + simpleImportProps.put("Source", "Demo Simple Import"); + JSONObject simpleImportMessage = messageBuilder.importEvent(distinctId, "Simple Import Event", new JSONObject(simpleImportProps)); + messages.add(simpleImportMessage); + + // Import event with no properties at all (time and $insert_id both auto-generated) + JSONObject minimalImportMessage = messageBuilder.importEvent(distinctId, "Minimal Import Event", null); + messages.add(minimalImportMessage); + + // Demonstrate gzip compression by sending some messages with compression enabled + System.out.println("\n=== Demonstrating gzip compression ==="); + + // Send a regular event with gzip compression + Map gzipEventProps = new HashMap(); + gzipEventProps.put("Compression", "gzip"); + gzipEventProps.put("Demo", "true"); + JSONObject gzipEvent = messageBuilder.event(distinctId, "Gzip Compressed Event", new JSONObject(gzipEventProps)); + messagesWithGzip.add(gzipEvent); + + // Send an import event with gzip compression + long historicalTime = System.currentTimeMillis() - (60L * 24L * 60L * 60L * 1000L); + Map gzipImportProps = new HashMap(); + gzipImportProps.put("time", historicalTime); + gzipImportProps.put("$insert_id", "gzip-import-" + System.currentTimeMillis()); + gzipImportProps.put("Compression", "gzip"); + gzipImportProps.put("Event Type", "Historical with Gzip"); + JSONObject gzipImportEvent = messageBuilder.importEvent(distinctId, "Gzip Compressed Import", new JSONObject(gzipImportProps)); + messagesWithGzip.add(gzipImportEvent); + + System.out.println("Added events to gzip compression queue\n"); + while((line != null) && (line.length() > 0)) { System.out.println("SENDING LINE: " + line); Map propMap = new HashMap(); @@ -117,10 +177,11 @@ public static void main(String[] args) line = inputLines.readLine(); } - while(! messages.isEmpty()) { + while(! messages.isEmpty() || ! messagesWithGzip.isEmpty()) { Thread.sleep(1000); } worker.interrupt(); + workerWithGzip.interrupt(); } } diff --git a/src/main/java/com/mixpanel/mixpanelapi/ClientDelivery.java b/src/main/java/com/mixpanel/mixpanelapi/ClientDelivery.java index a166161..6ba467e 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/ClientDelivery.java +++ b/src/main/java/com/mixpanel/mixpanelapi/ClientDelivery.java @@ -14,6 +14,7 @@ public class ClientDelivery { private final List mEventsMessages = new ArrayList(); private final List mPeopleMessages = new ArrayList(); private final List mGroupMessages = new ArrayList(); + private final List mImportMessages = new ArrayList(); /** * Adds an individual message to this delivery. Messages to Mixpanel are often more efficient when sent in batches. @@ -41,6 +42,9 @@ else if (messageType.equals("people")) { else if (messageType.equals("group")) { mGroupMessages.add(messageContent); } + else if (messageType.equals("import")) { + mImportMessages.add(messageContent); + } } catch (JSONException e) { throw new RuntimeException("Apparently valid mixpanel message could not be interpreted.", e); } @@ -63,7 +67,7 @@ public boolean isValidMessage(JSONObject message) { if (messageContents == null) { ret = false; } - else if (!messageType.equals("event") && !messageType.equals("people") && !messageType.equals("group")) { + else if (!messageType.equals("event") && !messageType.equals("people") && !messageType.equals("group") && !messageType.equals("import")) { ret = false; } } @@ -86,4 +90,8 @@ else if (!messageType.equals("event") && !messageType.equals("people") && !messa return mGroupMessages; } + /* package */ List getImportMessages() { + return mImportMessages; + } + } diff --git a/src/main/java/com/mixpanel/mixpanelapi/Config.java b/src/main/java/com/mixpanel/mixpanelapi/Config.java index d0564db..50b8bc7 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/Config.java +++ b/src/main/java/com/mixpanel/mixpanelapi/Config.java @@ -3,4 +3,5 @@ /* package */ class Config { public static final String BASE_ENDPOINT = "https://api.mixpanel.com"; public static final int MAX_MESSAGE_SIZE = 50; + public static final int IMPORT_MAX_MESSAGE_SIZE = 2000; } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index 4462c38..45ed07d 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -7,6 +7,7 @@ import java.util.Iterator; import java.util.Map; import java.util.TimeZone; +import java.util.UUID; import org.json.JSONArray; import org.json.JSONException; @@ -83,6 +84,68 @@ public JSONObject event(String distinctId, String eventName, JSONObject properti } } + /** + * Creates a message for importing historical events (events older than 5 days) to Mixpanel via the /import endpoint. + * This method is similar to event(), but is designed for the import endpoint which requires: + * - A custom timestamp (defaults to current time if not provided) + * - An insert_id for deduplication (auto-generated if not provided) + * - Basic authentication using the project token + * + * See: + * https://developer.mixpanel.com/reference/import-events + * + * @param distinctId a string uniquely identifying the individual cause associated with this event + * @param eventName a human readable name for the event, for example "Purchase", or "Threw Exception" + * @param properties a JSONObject associating properties with the event. Optional properties: + * - "time": timestamp in milliseconds since epoch (defaults to current time) + * - "$insert_id": unique identifier for deduplication (auto-generated if not provided) + * @return import event message for consumption by MixpanelAPI + */ + public JSONObject importEvent(String distinctId, String eventName, JSONObject properties) { + long time = System.currentTimeMillis(); + + // Nothing below should EVER throw a JSONException. + try { + JSONObject dataObj = new JSONObject(); + dataObj.put("event", eventName); + + JSONObject propertiesObj = null; + if (properties == null) { + propertiesObj = new JSONObject(); + } + else { + propertiesObj = new JSONObject(properties.toString()); + } + // no need to add $import true property as this is added by the backend for any event imported. + if (! propertiesObj.has("token")) propertiesObj.put("token", mToken); + + // Set default time to current time if not provided + if (! propertiesObj.has("time")) propertiesObj.put("time", time); + + // Generate default $insert_id if not provided (to prevent duplicates) + // Uses UUID v4 (random) in hex format, matching Python SDK implementation + if (! propertiesObj.has("$insert_id")) { + String insertId = UUID.randomUUID().toString().replace("-", ""); + propertiesObj.put("$insert_id", insertId); + } + + if (! propertiesObj.has("mp_lib")) propertiesObj.put("mp_lib", "jdk"); + + if (distinctId != null) + propertiesObj.put("distinct_id", distinctId); + + dataObj.put("properties", propertiesObj); + + JSONObject envelope = new JSONObject(); + envelope.put("envelope_version", 1); + envelope.put("message_type", "import"); + envelope.put("message", dataObj); + return envelope; + } catch (JSONException e) { + throw new RuntimeException("Can't construct a Mixpanel import message", e); + } + } + /** * Sets a property on the profile associated with the given distinctId. When * sent, this message will overwrite any existing values for the given diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index be4d01b..e1734a4 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -5,11 +5,14 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; import java.util.List; +import java.util.zip.GZIPOutputStream; import org.json.JSONArray; +import org.json.JSONException; import org.json.JSONObject; /** @@ -33,16 +36,27 @@ public class MixpanelAPI { protected final String mEventsEndpoint; protected final String mPeopleEndpoint; protected final String mGroupsEndpoint; + protected final String mImportEndpoint; + protected final boolean mUseGzipCompression; /** * Constructs a MixpanelAPI object associated with the production, Mixpanel services. */ public MixpanelAPI() { - this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups"); + this(false); } /** - * Create a MixpaneAPI associated with custom URLS for events and people updates. + * Constructs a MixpanelAPI object associated with the production, Mixpanel services. + * + * @param useGzipCompression whether to use gzip compression for network requests + */ + public MixpanelAPI(boolean useGzipCompression) { + this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression); + } + + /** + * Create a MixpanelAPI associated with custom URLS for events and people updates. * * Useful for testing and proxying. Most callers should use the constructor with no arguments. * @@ -51,13 +65,11 @@ public MixpanelAPI() { * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { - mEventsEndpoint = eventsEndpoint; - mPeopleEndpoint = peopleEndpoint; - mGroupsEndpoint = Config.BASE_ENDPOINT + "/groups"; + this(eventsEndpoint, peopleEndpoint, Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", false); } /** - * Create a MixpaneAPI associated with custom URLS for the Mixpanel service. + * Create a MixpanelAPI associated with custom URLS for the Mixpanel service. * * Useful for testing and proxying. Most callers should use the constructor with no arguments. * @@ -67,9 +79,42 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint) { + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, Config.BASE_ENDPOINT + "/import", false); + } + + /** + * Create a MixpanelAPI associated with custom URLS for the Mixpanel service. + * + * Useful for testing and proxying. Most callers should use the constructor with no arguments. + * + * @param eventsEndpoint a URL that will accept Mixpanel events messages + * @param peopleEndpoint a URL that will accept Mixpanel people messages + * @param groupsEndpoint a URL that will accept Mixpanel groups messages + * @param importEndpoint a URL that will accept Mixpanel import messages + * @see #MixpanelAPI() + */ + public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint) { + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false); + } + + /** + * Create a MixpanelAPI associated with custom URLS for the Mixpanel service. + * + * Useful for testing and proxying. Most callers should use the constructor with no arguments. + * + * @param eventsEndpoint a URL that will accept Mixpanel events messages + * @param peopleEndpoint a URL that will accept Mixpanel people messages + * @param groupsEndpoint a URL that will accept Mixpanel groups messages + * @param importEndpoint a URL that will accept Mixpanel import messages + * @param useGzipCompression whether to use gzip compression for network requests + * @see #MixpanelAPI() + */ + public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression) { mEventsEndpoint = eventsEndpoint; mPeopleEndpoint = peopleEndpoint; mGroupsEndpoint = groupsEndpoint; + mImportEndpoint = importEndpoint; + mUseGzipCompression = useGzipCompression; } /** @@ -127,6 +172,13 @@ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOExcept String groupsUrl = mGroupsEndpoint + "?" + ipParameter; List groupMessages = toSend.getGroupMessages(); sendMessages(groupMessages, groupsUrl); + + // Handle import messages - use strict mode and extract token for auth + List importMessages = toSend.getImportMessages(); + if (importMessages.size() > 0) { + String importUrl = mImportEndpoint + "?strict=1"; + sendImportMessages(importMessages, importUrl); + } } /** @@ -155,15 +207,45 @@ protected String encodeDataString(String dataString) { conn.setReadTimeout(READ_TIMEOUT_MILLIS); conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); conn.setDoOutput(true); - conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=utf8"); - String encodedData = encodeDataString(dataString); - String encodedQuery = "data=" + encodedData; + byte[] dataToSend; + if (mUseGzipCompression) { + // Use gzip compression + conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=utf8"); + conn.setRequestProperty("Content-Encoding", "gzip"); + + String encodedData = encodeDataString(dataString); + String encodedQuery = "data=" + encodedData; + + // Compress the data + java.io.ByteArrayOutputStream byteStream = new java.io.ByteArrayOutputStream(); + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(byteStream); + gzipStream.write(encodedQuery.getBytes("utf-8")); + gzipStream.finish(); + dataToSend = byteStream.toByteArray(); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + // ignore + } + } + } + } else { + // No compression + conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=utf8"); + String encodedData = encodeDataString(dataString); + String encodedQuery = "data=" + encodedData; + dataToSend = encodedQuery.getBytes("utf-8"); + } OutputStream postStream = null; try { postStream = conn.getOutputStream(); - postStream.write(encodedQuery.getBytes()); + postStream.write(dataToSend); } finally { if (postStream != null) { try { @@ -209,6 +291,42 @@ private void sendMessages(List messages, String endpointUrl) throws } } + private void sendImportMessages(List messages, String endpointUrl) throws IOException { + // Extract token from first message for authentication + // If token is missing, we'll still attempt to send and let the server reject it + String token = ""; + if (messages.size() > 0) { + try { + JSONObject firstMessage = messages.get(0); + if (firstMessage.has("properties")) { + JSONObject properties = firstMessage.getJSONObject("properties"); + if (properties.has("token")) { + token = properties.getString("token"); + } + } + } catch (JSONException e) { + // Malformed message - continue with empty token and let server reject it + } + } + + // Send messages in batches (max 2000 per batch for /import) + // If token is empty, the server will reject with 401 Unauthorized + for (int i = 0; i < messages.size(); i += Config.IMPORT_MAX_MESSAGE_SIZE) { + int endIndex = i + Config.IMPORT_MAX_MESSAGE_SIZE; + endIndex = Math.min(endIndex, messages.size()); + List batch = messages.subList(i, endIndex); + + if (batch.size() > 0) { + String messagesString = dataString(batch); + boolean accepted = sendImportData(messagesString, endpointUrl, token); + + if (! accepted) { + throw new MixpanelServerException("Server refused to accept import messages, they may be malformed.", batch); + } + } + } + } + private String dataString(List messages) { JSONArray array = new JSONArray(); for (JSONObject message:messages) { @@ -218,6 +336,130 @@ private String dataString(List messages) { return array.toString(); } + /** + * Sends import data to the /import endpoint with Basic Auth using the project token. + * The /import endpoint requires: + * - JSON content type (not URL-encoded like /track) + * - Basic authentication with token as username and empty password + * - strict=1 parameter for validation + * + * @param dataString JSON array of events to import + * @param endpointUrl The import endpoint URL + * @param token The project token for Basic Auth + * @return true if the server accepted the data + * @throws IOException if there's a network error + */ + /* package */ boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { + URL endpoint = new URL(endpointUrl); + HttpURLConnection conn = (HttpURLConnection) endpoint.openConnection(); + conn.setReadTimeout(READ_TIMEOUT_MILLIS); + conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + conn.setDoOutput(true); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + + // Add Basic Auth header: username is token, password is empty + try { + String authString = token + ":"; + byte[] authBytes = authString.getBytes("utf-8"); + String base64Auth = new String(Base64Coder.encode(authBytes)); + conn.setRequestProperty("Authorization", "Basic " + base64Auth); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Mixpanel library requires utf-8 support", e); + } + + byte[] dataToSend; + if (mUseGzipCompression) { + // Use gzip compression + conn.setRequestProperty("Content-Encoding", "gzip"); + + // Compress the data + java.io.ByteArrayOutputStream byteStream = new java.io.ByteArrayOutputStream(); + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(byteStream); + gzipStream.write(dataString.getBytes("utf-8")); + gzipStream.finish(); + dataToSend = byteStream.toByteArray(); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + // ignore + } + } + } + } else { + // No compression + dataToSend = dataString.getBytes("utf-8"); + } + + OutputStream postStream = null; + try { + postStream = conn.getOutputStream(); + postStream.write(dataToSend); + } finally { + if (postStream != null) { + try { + postStream.close(); + } catch (IOException e) { + // ignore, in case we've already thrown + } + } + } + + InputStream responseStream = null; + String response = null; + try { + responseStream = conn.getInputStream(); + response = slurp(responseStream); + } catch (IOException e) { + // HTTP error codes (401, 400, etc.) throw IOException when calling getInputStream() + // Check if it's an HTTP error and read the error stream for details + InputStream errorStream = conn.getErrorStream(); + if (errorStream != null) { + try { + slurp(errorStream); + errorStream.close(); + // Return false to indicate rejection, which will throw MixpanelServerException + return false; + } catch (IOException ignored) { + // If we can't read the error stream, just let the original exception propagate + } + } + // Network error or other IOException - propagate it + throw e; + } finally { + if (responseStream != null) { + try { + responseStream.close(); + } catch (IOException e) { + // ignore, in case we've already thrown + } + } + } + + // Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N} + if (response == null) { + return false; + } + + // Parse JSON response + try { + JSONObject jsonResponse = new JSONObject(response); + + // Check for {"status":"OK"} and {"code":200} + boolean statusOk = jsonResponse.has("status") && "OK".equals(jsonResponse.getString("status")); + boolean codeOk = jsonResponse.has("code") && jsonResponse.getInt("code") == 200; + + return statusOk && codeOk; + } catch (JSONException e) { + // Not valid JSON or missing expected fields + return false; + } + } + private String slurp(InputStream in) throws IOException { final StringBuilder out = new StringBuilder(); InputStreamReader reader = new InputStreamReader(in, "utf8"); diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 0e23689..d86155d 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -1,5 +1,6 @@ package com.mixpanel.mixpanelapi; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -8,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.zip.GZIPInputStream; import org.json.JSONArray; import org.json.JSONException; @@ -700,4 +702,421 @@ private void checkProfileProps(String operation, JSONObject built) { } } + public void testImportEvent() { + // Test creating an import event message + try { + // Time more than 5 days ago and less than 1 year ago (30 days) + long historicalTime = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L); + + JSONObject properties = new JSONObject(); + properties.put("time", historicalTime); + properties.put("$insert_id", "test-insert-id-123"); + properties.put("prop key", "prop value"); + + JSONObject importMessage = mBuilder.importEvent("a distinct id", "Historical Event", properties); + + // Verify the message structure + assertTrue("Message is valid", new ClientDelivery().isValidMessage(importMessage)); + assertEquals("Message type is import", "import", importMessage.getString("message_type")); + + JSONObject message = importMessage.getJSONObject("message"); + assertEquals("Event name correct", "Historical Event", message.getString("event")); + + JSONObject props = message.getJSONObject("properties"); + assertEquals("distinct_id correct", "a distinct id", props.getString("distinct_id")); + assertEquals("time correct", historicalTime, props.getLong("time")); + assertEquals("$insert_id correct", "test-insert-id-123", props.getString("$insert_id")); + assertEquals("token present", "a token", props.getString("token")); + assertEquals("custom property present", "prop value", props.getString("prop key")); + } catch (JSONException e) { + fail("Failed to create or parse import event: " + e.toString()); + } + } + + public void testImportMessageDelivery() { + // Test that import messages are properly sent + final Map sawData = new HashMap(); + final Map sawToken = new HashMap(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + sawData.put(endpointUrl, dataString); + sawToken.put(endpointUrl, token); + return true; + } + }; + + ClientDelivery c = new ClientDelivery(); + + // Create import events with historical timestamps (90 days ago, within >5 days and <1 year range) + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props1 = new JSONObject(); + props1.put("time", historicalTime); + props1.put("$insert_id", "import-id-1"); + props1.put("Item", "Widget"); + + JSONObject importEvent1 = mBuilder.importEvent("a distinct id", "Purchase", props1); + c.addMessage(importEvent1); + + JSONObject props2 = new JSONObject(); + props2.put("time", historicalTime + 1000); + props2.put("$insert_id", "import-id-2"); + props2.put("Page", "Home"); + + JSONObject importEvent2 = mBuilder.importEvent("a distinct id", "Page View", props2); + c.addMessage(importEvent2); + + api.deliver(c); + + // Verify the import data was sent + String importData = sawData.get("import url?strict=1"); + assertNotNull("Import data was sent", importData); + + // Verify token was extracted and used for auth + String usedToken = sawToken.get("import url?strict=1"); + assertEquals("Token extracted correctly", "a token", usedToken); + + // Parse and verify the import data + JSONArray sentMessages = new JSONArray(importData); + assertEquals("Two import messages sent", 2, sentMessages.length()); + + JSONObject sentEvent1 = sentMessages.getJSONObject(0); + assertEquals("First event name correct", "Purchase", sentEvent1.getString("event")); + + JSONObject sentProps1 = sentEvent1.getJSONObject("properties"); + assertEquals("First event distinct_id correct", "a distinct id", sentProps1.getString("distinct_id")); + assertTrue("First event has $insert_id", sentProps1.has("$insert_id")); + + } catch (IOException e) { + fail("IOException during delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON parsing error: " + e.toString()); + } + } + + public void testImportLargeBatch() { + // Test that import messages respect the 2000 message batch size limit + final List sends = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + sends.add(dataString); + return true; + } + }; + + ClientDelivery c = new ClientDelivery(); + // Use 180 days ago (6 months, within >5 days and <1 year range) + long historicalTime = System.currentTimeMillis() - (180L * 24L * 60L * 60L * 1000L); + + // Create more than 2000 import events + int totalEvents = 2500; + for (int i = 0; i < totalEvents; i++) { + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime + i); + props.put("$insert_id", "insert-id-" + i); + props.put("count", i); + + JSONObject importEvent = mBuilder.importEvent("a distinct id", "Test Event", props); + c.addMessage(importEvent); + } catch (JSONException e) { + fail("Failed to create import event: " + e.toString()); + } + } + + try { + api.deliver(c); + + // Should be split into 2 batches (2000 + 500) + assertEquals("Messages split into batches", 2, sends.size()); + + JSONArray firstBatch = new JSONArray(sends.get(0)); + assertEquals("First batch has 2000 events", Config.IMPORT_MAX_MESSAGE_SIZE, firstBatch.length()); + + JSONArray secondBatch = new JSONArray(sends.get(1)); + assertEquals("Second batch has 500 events", 500, secondBatch.length()); + + } catch (IOException e) { + fail("IOException during delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON parsing error: " + e.toString()); + } + } + + public void testImportMessageValidation() { + // Test that import messages are validated correctly + ClientDelivery c = new ClientDelivery(); + + // Use 300 days ago (within >5 days and <1 year range) + long historicalTime = System.currentTimeMillis() - (300L * 24L * 60L * 60L * 1000L); + + try { + JSONObject properties = new JSONObject(); + properties.put("time", historicalTime); + properties.put("$insert_id", "validation-test-id"); + + JSONObject importMessage = mBuilder.importEvent("a distinct id", "Test", properties); + + assertTrue("Import message is valid", c.isValidMessage(importMessage)); + + // Add to delivery and verify it's in the import messages list + c.addMessage(importMessage); + assertEquals("Import message added to import list", 1, c.getImportMessages().size()); + assertEquals("No regular event messages", 0, c.getEventsMessages().size()); + + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + + public void testImportEventWithDefaults() { + // Test that import events automatically generate time and $insert_id if not provided + long beforeTime = System.currentTimeMillis(); + + try { + // Test 1: No properties at all - should generate both time and $insert_id + JSONObject importMessage1 = mBuilder.importEvent("user-123", "Test Event", null); + + assertTrue("Message is valid", new ClientDelivery().isValidMessage(importMessage1)); + assertEquals("Message type is import", "import", importMessage1.getString("message_type")); + + JSONObject message1 = importMessage1.getJSONObject("message"); + JSONObject props1 = message1.getJSONObject("properties"); + + assertTrue("Has auto-generated time", props1.has("time")); + long generatedTime = props1.getLong("time"); + assertTrue("Generated time is recent", generatedTime >= beforeTime && generatedTime <= System.currentTimeMillis()); + + assertTrue("Has auto-generated $insert_id", props1.has("$insert_id")); + String insertId1 = props1.getString("$insert_id"); + assertEquals("$insert_id is 32 characters (UUID hex format)", 32, insertId1.length()); + assertTrue("$insert_id is valid hex", insertId1.matches("[0-9a-f]{32}")); + + // Test 2: Empty properties object - should generate both + JSONObject emptyProps = new JSONObject(); + JSONObject importMessage2 = mBuilder.importEvent("user-456", "Another Event", emptyProps); + + JSONObject props2 = importMessage2.getJSONObject("message").getJSONObject("properties"); + assertTrue("Has auto-generated time", props2.has("time")); + assertTrue("Has auto-generated $insert_id", props2.has("$insert_id")); + + String insertId2 = props2.getString("$insert_id"); + assertFalse("Different events get different insert_ids", insertId1.equals(insertId2)); + + // Test 3: Custom time provided, should generate $insert_id only + long customTime = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L); + JSONObject propsWithTime = new JSONObject(); + propsWithTime.put("time", customTime); + + JSONObject importMessage3 = mBuilder.importEvent("user-789", "Custom Time Event", propsWithTime); + JSONObject props3 = importMessage3.getJSONObject("message").getJSONObject("properties"); + + assertEquals("Custom time preserved", customTime, props3.getLong("time")); + assertTrue("$insert_id auto-generated", props3.has("$insert_id")); + + // Test 4: Custom $insert_id provided, should generate time only + JSONObject propsWithInsertId = new JSONObject(); + propsWithInsertId.put("$insert_id", "my-custom-insert-id"); + + JSONObject importMessage4 = mBuilder.importEvent("user-abc", "Custom Insert ID Event", propsWithInsertId); + JSONObject props4 = importMessage4.getJSONObject("message").getJSONObject("properties"); + + assertTrue("Time auto-generated", props4.has("time")); + assertEquals("Custom $insert_id preserved", "my-custom-insert-id", props4.getString("$insert_id")); + + // Test 5: Both custom time and $insert_id provided - should preserve both + JSONObject propsWithBoth = new JSONObject(); + propsWithBoth.put("time", customTime); + propsWithBoth.put("$insert_id", "fully-custom-id"); + + JSONObject importMessage5 = mBuilder.importEvent("user-xyz", "Fully Custom Event", propsWithBoth); + JSONObject props5 = importMessage5.getJSONObject("message").getJSONObject("properties"); + + assertEquals("Custom time preserved", customTime, props5.getLong("time")); + assertEquals("Custom $insert_id preserved", "fully-custom-id", props5.getString("$insert_id")); + + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + + public void testGzipCompressionEnabled() { + // Test that gzip compression is properly enabled and data is compressed + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { + @Override + public boolean sendData(String dataString, String endpointUrl) throws IOException { + // This method should be called with gzip compression enabled + fail("sendData should not be called directly when testing at this level"); + return true; + } + }; + + // Verify the API was created with gzip compression enabled + assertTrue("Gzip compression should be enabled", api.mUseGzipCompression); + } + + public void testGzipCompressionDisabled() { + // Test that gzip compression is disabled by default + MixpanelAPI api1 = new MixpanelAPI(); + assertFalse("Gzip compression should be disabled by default", api1.mUseGzipCompression); + + MixpanelAPI api2 = new MixpanelAPI(false); + assertFalse("Gzip compression should be disabled when explicitly set to false", api2.mUseGzipCompression); + + MixpanelAPI api3 = new MixpanelAPI("events url", "people url"); + assertFalse("Gzip compression should be disabled by default for custom endpoints", api3.mUseGzipCompression); + } + + public void testGzipCompressionDataIntegrity() { + // Test that data compressed with gzip can be decompressed correctly + final Map capturedCompressedData = new HashMap(); + final Map capturedOriginalData = new HashMap(); + + MixpanelAPI apiWithGzip = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { + @Override + public boolean sendData(String dataString, String endpointUrl) throws IOException { + capturedOriginalData.put(endpointUrl, dataString); + + // Simulate what the real sendData does with gzip + if (mUseGzipCompression) { + try { + String encodedData = encodeDataString(dataString); + String encodedQuery = "data=" + encodedData; + + java.io.ByteArrayOutputStream byteStream = new java.io.ByteArrayOutputStream(); + java.util.zip.GZIPOutputStream gzipStream = new java.util.zip.GZIPOutputStream(byteStream); + gzipStream.write(encodedQuery.getBytes("utf-8")); + gzipStream.finish(); + gzipStream.close(); + + capturedCompressedData.put(endpointUrl, byteStream.toByteArray()); + } catch (Exception e) { + throw new IOException("Compression failed", e); + } + } + + return true; + } + }; + + ClientDelivery delivery = new ClientDelivery(); + JSONObject event = mBuilder.event("test-user", "Test Event", mSampleProps); + delivery.addMessage(event); + + try { + apiWithGzip.deliver(delivery); + + // Verify data was captured + String eventUrl = "events url?ip=0"; + assertTrue("Original data was captured", capturedOriginalData.containsKey(eventUrl)); + assertTrue("Compressed data was captured", capturedCompressedData.containsKey(eventUrl)); + + // Verify compressed data is smaller than original (for typical data) + byte[] compressedBytes = capturedCompressedData.get(eventUrl); + String originalData = capturedOriginalData.get(eventUrl); + String encodedData = apiWithGzip.encodeDataString(originalData); + String encodedQuery = "data=" + encodedData; + + assertTrue("Compressed data exists", compressedBytes.length > 0); + + // Decompress and verify data integrity + ByteArrayInputStream byteStream = new ByteArrayInputStream(compressedBytes); + GZIPInputStream gzipStream = new GZIPInputStream(byteStream); + java.io.ByteArrayOutputStream decompressedStream = new java.io.ByteArrayOutputStream(); + + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipStream.read(buffer)) > 0) { + decompressedStream.write(buffer, 0, len); + } + gzipStream.close(); + + String decompressedData = decompressedStream.toString("utf-8"); + assertEquals("Decompressed data matches original", encodedQuery, decompressedData); + + } catch (IOException e) { + fail("IOException during gzip test: " + e.toString()); + } + } + + public void testGzipCompressionForImport() { + // Test that gzip compression works for import endpoint + final Map capturedCompressedData = new HashMap(); + final Map capturedOriginalData = new HashMap(); + + MixpanelAPI apiWithGzip = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { + capturedOriginalData.put(endpointUrl, dataString); + + // Simulate what the real sendImportData does with gzip + if (mUseGzipCompression) { + try { + java.io.ByteArrayOutputStream byteStream = new java.io.ByteArrayOutputStream(); + java.util.zip.GZIPOutputStream gzipStream = new java.util.zip.GZIPOutputStream(byteStream); + gzipStream.write(dataString.getBytes("utf-8")); + gzipStream.finish(); + gzipStream.close(); + + capturedCompressedData.put(endpointUrl, byteStream.toByteArray()); + } catch (Exception e) { + throw new IOException("Compression failed", e); + } + } + + return true; + } + }; + + ClientDelivery delivery = new ClientDelivery(); + + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "gzip-test-id"); + + JSONObject importEvent = mBuilder.importEvent("test-user", "Historical Event", props); + delivery.addMessage(importEvent); + + apiWithGzip.deliver(delivery); + + // Verify data was captured + String importUrl = "import url?strict=1"; + assertTrue("Original data was captured", capturedOriginalData.containsKey(importUrl)); + assertTrue("Compressed data was captured", capturedCompressedData.containsKey(importUrl)); + + // Verify compressed data can be decompressed + byte[] compressedBytes = capturedCompressedData.get(importUrl); + String originalData = capturedOriginalData.get(importUrl); + + assertTrue("Compressed data exists", compressedBytes.length > 0); + + // Decompress and verify data integrity + ByteArrayInputStream byteStream = new ByteArrayInputStream(compressedBytes); + GZIPInputStream gzipStream = new GZIPInputStream(byteStream); + java.io.ByteArrayOutputStream decompressedStream = new java.io.ByteArrayOutputStream(); + + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipStream.read(buffer)) > 0) { + decompressedStream.write(buffer, 0, len); + } + gzipStream.close(); + + String decompressedData = decompressedStream.toString("utf-8"); + assertEquals("Decompressed data matches original", originalData, decompressedData); + + } catch (IOException e) { + fail("IOException during gzip import test: " + e.toString()); + } catch (JSONException e) { + fail("JSONException during gzip import test: " + e.toString()); + } + } + }