diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy index 4d91429e4c0..e34cd1ab08e 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy @@ -1,16 +1,12 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark212Listener import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected DatadogSpark212Listener getTestDatadogSparkListener() { + protected SparkListener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark212Listener(conf, "some_app_id", "some_version") } - - @Override - protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) { - return new DatadogSpark212Listener(conf, "some_app_id", "some_version") - } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy index cbfb88b13df..a358f95e4f3 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy @@ -1,16 +1,12 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark213Listener import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected DatadogSpark213Listener getTestDatadogSparkListener() { + protected SparkListener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark213Listener(conf, "some_app_id", "some_version") } - - @Override - protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) { - return new DatadogSpark213Listener(conf, "some_app_id", "some_version") - } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 6d17ad16e80..511515bc35c 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp } public void setupOpenLineage(DDTraceId traceId) { - log.debug("Setting up OpenLineage configuration with trace id {}", traceId); + log.debug("Setting up OpenLineage configuration"); if (openLineageSparkListener != null) { openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); @@ -273,14 +273,17 @@ private void captureOpenlineageContextIfPresent( AgentTracer.SpanBuilder builder, OpenlineageParentContext context) { builder.asChildOf(context); - log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId()); + builder.withSpanId(context.getChildRootSpanId()); + + log.debug( + "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", + context, + context.getTraceId(), + context.getChildRootSpanId()); builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); builder.withTag("openlineage_parent_job_name", context.getParentJobName()); builder.withTag("openlineage_parent_run_id", context.getParentRunId()); - builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace()); - builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName()); - builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId()); } @Override diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index 51977d73f6c..6a0b28a70c0 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -7,7 +7,10 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; -import datadog.trace.util.FNV64Hash; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -24,23 +27,16 @@ public class OpenlineageParentContext implements AgentSpanContext { private final DDTraceId traceId; private final long spanId; + private final long childRootSpanId; private final String parentJobNamespace; private final String parentJobName; private final String parentRunId; - private final String rootParentJobNamespace; - private final String rootParentJobName; - private final String rootParentRunId; public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE = "spark.openlineage.parentJobNamespace"; public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName"; public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId"; - public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE = - "spark.openlineage.rootParentJobNamespace"; - public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME = - "spark.openlineage.rootParentJobName"; - public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId"; public static Optional from(SparkConf sparkConf) { if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE) @@ -49,84 +45,68 @@ public static Optional from(SparkConf sparkConf) { return Optional.empty(); } - if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { - log.debug("Found parent info, but not root parent info. Can't construct valid trace id."); - return Optional.empty(); - } - String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE); String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME); String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID); if (!UUID.matcher(parentRunId).matches()) { - log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId); - return Optional.empty(); - } - - String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE); - String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME); - String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID); - - if (!UUID.matcher(rootParentRunId).matches()) { - log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } return Optional.of( - new OpenlineageParentContext( - parentJobNamespace, - parentJobName, - parentRunId, - rootParentJobNamespace, - rootParentJobName, - rootParentRunId)); + new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId)); } - OpenlineageParentContext( - String parentJobNamespace, - String parentJobName, - String parentRunId, - String rootParentJobNamespace, - String rootParentJobName, - String rootParentRunId) { + OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) { log.debug( - "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}", + "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}", parentJobNamespace, parentJobName, - parentRunId, - rootParentJobNamespace, - rootParentJobName, - rootParentRunId); + parentRunId); this.parentJobNamespace = parentJobNamespace; this.parentJobName = parentJobName; this.parentRunId = parentRunId; - this.rootParentJobNamespace = rootParentJobNamespace; - this.rootParentJobName = rootParentJobName; - this.rootParentRunId = rootParentRunId; + MessageDigest digest = null; + try { + digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + log.debug("Unable to find SHA-256 algorithm", e); + } - if (this.rootParentRunId != null) { - traceId = computeTraceId(this.rootParentRunId); - spanId = computeSpanId(this.parentRunId); - } else if (this.parentRunId != null) { - traceId = computeTraceId(this.parentRunId); - spanId = computeSpanId(this.parentRunId); + if (digest != null && parentJobNamespace != null && parentRunId != null) { + traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId); + spanId = DDSpanId.ZERO; + + childRootSpanId = + computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId); } else { traceId = DDTraceId.ZERO; spanId = DDSpanId.ZERO; + + childRootSpanId = DDSpanId.ZERO; } log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId); } - private long computeSpanId(String runId) { - return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A); + private long computeChildRootSpanId( + MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { + byte[] inputBytes = + (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); + byte[] hash = digest.digest(inputBytes); + + return ByteBuffer.wrap(hash).getLong(); } - private DDTraceId computeTraceId(String runId) { - log.debug("Generating traceID from runId: {}", runId); - return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A)); + private DDTraceId computeTraceId( + MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { + byte[] inputBytes = + (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); + byte[] hash = digest.digest(inputBytes); + + return DDTraceId.from(ByteBuffer.wrap(hash).getLong()); } @Override @@ -139,6 +119,10 @@ public long getSpanId() { return spanId; } + public long getChildRootSpanId() { + return childRootSpanId; + } + @Override public AgentTraceCollector getTraceCollector() { return AgentTracer.NoopAgentTraceCollector.INSTANCE; @@ -175,16 +159,4 @@ public String getParentJobName() { public String getParentRunId() { return parentRunId; } - - public String getRootParentJobNamespace() { - return rootParentJobNamespace; - } - - public String getRootParentJobName() { - return rootParentJobName; - } - - public String getRootParentRunId() { - return rootParentRunId; - } } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 47367fbef82..2e910eafe09 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding import com.datadoghq.sketch.ddsketch.proto.DDSketch import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore import datadog.trace.agent.test.AgentTestRunner -import org.apache.spark.SparkConf import org.apache.spark.Success$ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.JobSucceeded$ +import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerExecutorAdded @@ -30,8 +30,7 @@ import scala.collection.JavaConverters abstract class AbstractSparkListenerTest extends AgentTestRunner { - protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener() - protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf) + protected abstract SparkListener getTestDatadogSparkListener() protected applicationStartEvent(time=0L) { // Constructor of SparkListenerApplicationStart changed starting spark 3.0 @@ -464,22 +463,6 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner { } } - def "sets up OpenLineage trace id properly"() { - setup: - def conf = new SparkConf() - conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84") - conf.set("spark.openlineage.parentJobNamespace", "default") - conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3") - conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39") - conf.set("spark.openlineage.rootParentJobNamespace", "default") - conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark") - def listener = getTestDatadogSparkListener(conf) - - expect: - listener.onApplicationStart(applicationStartEvent(1000L)) - assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119") - } - def "test lastJobFailed is not set when job is cancelled"() { setup: def listener = getTestDatadogSparkListener() diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy index 0287ab06e87..34ec29b42b4 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy @@ -1,10 +1,11 @@ package datadog.trace.instrumentation.spark +import datadog.trace.api.DDSpanId import org.apache.spark.SparkConf import spock.lang.Specification class OpenlineageParentContextTest extends Specification { - def "should create OpenLineageParentContext with particular trace id based on root parent id" () { + def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () { given: SparkConf mockSparkConf = Mock(SparkConf) @@ -12,11 +13,9 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -24,16 +23,16 @@ class OpenlineageParentContextTest extends Specification { parentContext.get().getParentJobNamespace() == "default" parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3" - parentContext.get().getRootParentRunId() == rootParentRunId - parentContext.get().getParentRunId() == parentRunId + parentContext.get().getParentRunId() == expectedParentRunId - parentContext.get().traceId.toString() == expectedTraceId - parentContext.get().spanId.toString() == expectedSpanId + parentContext.get().traceId.toLong() == expectedTraceId + parentContext.get().spanId == DDSpanId.ZERO + parentContext.get().childRootSpanId == expectedRootSpanId where: - rootParentRunId | parentRunId | expectedTraceId | expectedSpanId - "01964820-5280-7674-b04e-82fbed085f39" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "13959090542865903119" | "2903780135964948649" - "1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e" | "6f6f6f6f-5e5e-4d4d-3c3c-2b2b2b2b2b2b" | "15830118871223350489" | "8020087091656517257" + parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId + "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL + "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL } def "should create empty OpenLineageParentContext if any required field is missing" () { @@ -44,24 +43,20 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentIdPresent then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) parentContext.isPresent() == expected where: - jobNamespacePresent | jobNamePresent | runIdPresent | rootParentIdPresent | expected - true | true | true | false | false - true | true | false | false | false - true | true | true | false | false - true | true | false | true | false - true | false | true | false | false - false | true | true | true | false - true | false | false | false | false - false | true | false | false | false - false | false | true | true | false - false | false | false | false | false + jobNamespacePresent | jobNamePresent | runIdPresent | expected + true | true | false | false + true | false | true | false + false | true | true | false + true | false | false | false + false | true | false | false + false | false | true | false + false | false | false | false } def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () { @@ -72,12 +67,9 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> runId - then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -91,33 +83,5 @@ class OpenlineageParentContextTest extends Specification { "6afeb6ee-729d-37f7-b8e6f47ca694" | false "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true } - - def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () { - given: - SparkConf mockSparkConf = Mock(SparkConf) - - when: - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true - mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" - mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId - - - then: - Optional parentContext = OpenlineageParentContext.from(mockSparkConf) - parentContext.isPresent() == expected - - where: - rootParentRunId | expected - "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true - " " | false - "invalid-uuid" | false - "6afeb6ee-729d-37f7-b8e6f47ca694" | false - "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true - } }