@@ -14,6 +14,21 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start')
1414const receiveFinishCh = channel ( 'apm:google-cloud-pubsub:receive:finish' )
1515const receiveErrorCh = channel ( 'apm:google-cloud-pubsub:receive:error' )
1616
17+ /**
18+ * Message-level channels for trace context injection and async context propagation:
19+ * - messagePublishCh: Inject trace context when Topic.publish() is called
20+ * - messageAckStoreCh: Store async context when message.ack() is called (PULL)
21+ * - messageAckRetrieveCh: Retrieve stored context when acknowledge() API is called (PULL)
22+ * - messageStoreCh: Propagate context when Subscription emits 'message' event (PULL)
23+ */
24+ const messagePublishCh = channel ( 'apm:google-cloud-pubsub:message:publish' )
25+ const messageAckStoreCh = channel ( 'apm:google-cloud-pubsub:message:ack-store' )
26+ const messageAckRetrieveCh = channel ( 'apm:google-cloud-pubsub:message:ack-retrieve' )
27+ const messageStoreCh = channel ( 'apm:google-cloud-pubsub:message:store' )
28+
29+ // WeakMap for passing context between LeaseManager._dispense and LeaseManager.remove
30+ const messageContexts = new WeakMap ( )
31+
1732const publisherMethods = [
1833 'createTopic' ,
1934 'updateTopic' ,
@@ -65,6 +80,21 @@ function wrapMethod (method) {
6580 if ( ! requestStartCh . hasSubscribers ) return method . apply ( this , arguments )
6681
6782 const ctx = { request, api, projectId : this . auth . _cachedProjectId }
83+
84+ /**
85+ * For acknowledge/modifyAckDeadline: retrieve stored context from consumer plugin.
86+ * These APIs only have ackIds (no Message objects), so async context is lost.
87+ * Plugin sets ctx.storedContext, which client.js uses to link the acknowledge span.
88+ */
89+ const isAckOperation = api === 'acknowledge' || api === 'modifyAckDeadline'
90+ if ( isAckOperation && request ?. ackIds ?. length > 0 ) {
91+ messageAckRetrieveCh . publish ( {
92+ ackIds : request . ackIds ,
93+ api,
94+ ctx
95+ } )
96+ }
97+
6898 return requestStartCh . runStores ( ctx , ( ) => {
6999 const cb = arguments [ arguments . length - 1 ]
70100
@@ -74,12 +104,11 @@ function wrapMethod (method) {
74104 ctx . error = error
75105 requestErrorCh . publish ( ctx )
76106 }
77-
78107 return requestFinishCh . runStores ( ctx , cb , this , ...arguments )
79108 } )
80-
81109 return method . apply ( this , arguments )
82110 }
111+
83112 return method . apply ( this , arguments )
84113 . then (
85114 response => {
@@ -108,12 +137,16 @@ function massWrap (obj, methods, wrapper) {
108137addHook ( { name : '@google-cloud/pubsub' , versions : [ '>=1.2' ] } , ( obj ) => {
109138 const Subscription = obj . Subscription
110139
140+ /**
141+ * PULL: Intercept 'message' events and propagate async context via runStores.
142+ * This ensures the consumer plugin creates spans in the correct context.
143+ */
111144 shimmer . wrap ( Subscription . prototype , 'emit' , emit => function ( eventName , message ) {
112145 if ( eventName !== 'message' || ! message ) return emit . apply ( this , arguments )
113146
114- const ctx = { }
147+ const ctx = { message }
115148 try {
116- return emit . apply ( this , arguments )
149+ return messageStoreCh . runStores ( ctx , emit , this , ... arguments )
117150 } catch ( err ) {
118151 ctx . error = err
119152 receiveErrorCh . publish ( ctx )
@@ -124,33 +157,122 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
124157 return obj
125158} )
126159
160+ addHook ( { name : '@google-cloud/pubsub' , versions : [ '>=1.2' ] , file : 'build/src/subscriber.js' } , ( obj ) => {
161+ const Message = obj . Message
162+
163+ /**
164+ * PULL: Capture async context when message.ack() is called.
165+ * This is our last chance before context is lost. The acknowledge() API call happens
166+ * later (often batched), and we'll retrieve this stored context to link spans.
167+ * Flow: message.ack() -> store context -> acknowledge() API -> retrieve context
168+ */
169+ if ( Message ?. prototype ?. ack ) {
170+ shimmer . wrap ( Message . prototype , 'ack' , originalAck => function ( ) {
171+ if ( this . ackId ) {
172+ const ctx = {
173+ message : this ,
174+ ackId : this . ackId
175+ }
176+
177+ return messageAckStoreCh . runStores ( ctx , originalAck , this , ...arguments )
178+ }
179+
180+ return originalAck . apply ( this , arguments )
181+ } )
182+ }
183+
184+ return obj
185+ } )
186+
187+ /**
188+ * PULL: Hook LeaseManager to track message lifecycle (dispense/remove/clear).
189+ * _dispense: Message given to handler -> create span
190+ * remove: Message removed from lease (ack/nack/timeout) -> finish span
191+ * clear: Subscription closed -> finish all remaining spans
192+ */
127193addHook ( { name : '@google-cloud/pubsub' , versions : [ '>=1.2' ] , file : 'build/src/lease-manager.js' } , ( obj ) => {
128194 const LeaseManager = obj . LeaseManager
129- const ctx = { }
195+ if ( ! LeaseManager ) {
196+ return obj
197+ }
130198
131199 shimmer . wrap ( LeaseManager . prototype , '_dispense' , dispense => function ( message ) {
132- if ( receiveStartCh . hasSubscribers ) {
133- ctx . message = message
134- return receiveStartCh . runStores ( ctx , dispense , this , ...arguments )
135- }
136- return dispense . apply ( this , arguments )
200+ const ctx = { message }
201+ messageContexts . set ( message , ctx )
202+
203+ return receiveStartCh . runStores ( ctx , dispense , this , ...arguments )
137204 } )
138205
139206 shimmer . wrap ( LeaseManager . prototype , 'remove' , remove => function ( message ) {
140- return receiveFinishCh . runStores ( ctx , remove , this , ...arguments )
207+ const ctx = messageContexts . get ( message )
208+ if ( ctx ) {
209+ messageContexts . delete ( message )
210+ }
211+
212+ return receiveFinishCh . runStores ( ctx || { message } , remove , this , ...arguments )
141213 } )
142214
143215 shimmer . wrap ( LeaseManager . prototype , 'clear' , clear => function ( ) {
144- for ( const message of this . _messages ) {
145- ctx . message = message
146- receiveFinishCh . publish ( ctx )
216+ if ( this . _messages ) {
217+ for ( const message of this . _messages . values ( ) ) {
218+ const ctx = messageContexts . get ( message )
219+ if ( ctx ) {
220+ receiveFinishCh . publish ( ctx )
221+ messageContexts . delete ( message )
222+ }
223+ }
147224 }
148225 return clear . apply ( this , arguments )
149226 } )
150227
151228 return obj
152229} )
153230
231+ /**
232+ * Inject trace context into individual messages via Topic.publish()/publishMessage().
233+ * Flow: User calls topic.publish() -> inject context (here) -> SDK batches messages ->
234+ * publish() API called -> producer plugin creates batch span + metadata
235+ */
236+ addHook ( { name : '@google-cloud/pubsub' , versions : [ '>=1.2' ] } , ( obj ) => {
237+ if ( ! obj . Topic ?. prototype ) return obj
238+
239+ if ( typeof obj . Topic . prototype . publishMessage === 'function' ) {
240+ shimmer . wrap ( obj . Topic . prototype , 'publishMessage' , publishMessage => {
241+ return function ( data , attributesOrCallback , callback ) {
242+ if ( data && typeof data === 'object' ) {
243+ if ( ! data . attributes ) data . attributes = { }
244+ messagePublishCh . publish ( {
245+ attributes : data . attributes ,
246+ pubsub : this . pubsub ,
247+ topicName : this . name
248+ } )
249+ }
250+ return publishMessage . apply ( this , arguments )
251+ }
252+ } )
253+ }
254+
255+ if ( typeof obj . Topic . prototype . publish === 'function' ) {
256+ shimmer . wrap ( obj . Topic . prototype , 'publish' , publish => function ( buffer , attributesOrCallback , callback ) {
257+ if ( typeof attributesOrCallback === 'function' || ! attributesOrCallback ) {
258+ arguments [ 1 ] = { }
259+ arguments [ 2 ] = attributesOrCallback
260+ }
261+
262+ messagePublishCh . publish ( {
263+ attributes : arguments [ 1 ] ,
264+ pubsub : this . pubsub ,
265+ topicName : this . name ,
266+ buffer
267+ } )
268+
269+ return publish . apply ( this , arguments )
270+ } )
271+ }
272+
273+ return obj
274+ } )
275+
154276addHook ( { name : '@google-cloud/pubsub' , versions : [ '>=1.2' ] } , ( obj ) => {
155277 const { PublisherClient, SchemaServiceClient, SubscriberClient } = obj . v1
156278
0 commit comments