Skip to content

[Bug]: Messages are not ACK on Pubsub starting Beam 2.52.0 on Flink Runner in detached mode #29902

@gfalcone

Description

@gfalcone

What happened?

Hello !

I have a streaming job processing messages from Pub/Sub that does not work anymore using Beam 2.52.0 with Flink Runner (in detached mode)

The pipeline works fine in Beam 2.51.0

Here is the code of the pipeline :

package com.mycompany.reco_videocatalog;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VideoCatalogWriteJob {

    private static final Logger logger = LoggerFactory.getLogger(VideoCatalogWriteJob.class);


    public static void main(String[] args) throws URISyntaxException, UnsupportedEncodingException, IOException, GeneralSecurityException,
           InterruptedException {
               // Register our custom options interface and parse command line arguments.
               PipelineOptionsFactory.register(Options.class);

               final Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);


               // Create the pipeline with the specified options.
               final Pipeline p = Pipeline.create(options);

               // Read input data
               p
               .apply("Read from Pub/Sub", PubsubIO.readStrings().fromSubscription(options.getOplogPubsubSubscription()));
               
               // Start pipeline execution.
               p.run();
    }
}

With the pom.xml :

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany</groupId>
    <artifactId>data-octopus</artifactId>
    <version>1.5.2</version>
    <name>data-octopus</name>
    <description>Recommendation features computation batch and real-time jobs</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>

        <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>

        <org.slf4j.version>1.7.21</org.slf4j.version>
        <ch.qos.logback.version>1.4.14</ch.qos.logback.version>
        <commons-math3.version>3.6.1</commons-math3.version>
        <guava.version>33.0.0-jre</guava.version>
        <beam.version>2.52.0</beam.version>
        <jackson.version>2.16.1</jackson.version>
        <aerospike-client.version>5.3.0</aerospike-client.version>
        <flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>
    </properties>

    <repositories>
       <repository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
             </snapshots>
        </repository>
        <repository>
            <id>central</id>
            <name>Maven Repository</name>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <distributionManagement>
        <repository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
        </repository>
        <snapshotRepository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
        </snapshotRepository>
    </distributionManagement>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${org.slf4j.version}</version>
        </dependency>

        <!-- Adds a dependency on the Beam SDK. -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <id>direct-runner</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <!-- Makes the DirectRunner available when running a pipeline. -->
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <artifactId>beam-runners-direct-java</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
        <profile>
            <id>flink-runner</id>
            <!-- Makes the FlinkRunner available when running a pipeline. -->
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <!-- Please see the Flink Runner page for an up-to-date list
                        of supported Flink versions and their artifact names:
                    https://beam.apache.org/documentation/runners/flink/ -->
                    <artifactId>${flink.artifact.name}</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.0.2</version>
                <executions>
                    <execution>
                        <id>template-videocatalog-dockerfile</id>
                        <phase>validate</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>deploy/videocatalog</outputDirectory>
                            <resources>
                                <resource>
                                    <directory>deploy/videocatalog/template</directory>
                                    <filtering>true</filtering>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.0.2</version>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/LICENSE</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            </transformers>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>bundled</shadedClassifierName>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <parallel>all</parallel>
                    <threadCount>4</threadCount>
                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.maven.surefire</groupId>
                        <artifactId>surefire-junit47</artifactId>
                        <version>2.18.1</version>
                    </dependency>
                </dependencies>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.1</version>
                <executions>
                    <execution>
                        <id>default-deploy</id>
                        <phase>deploy</phase>
                        <goals>
                            <goal>deploy</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

And the associated configuration :

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  annotations:
    meta.helm.sh/release-name: data-octopus.flink-pipelines
    meta.helm.sh/release-namespace: flink-pipelines
    rollme: Uv1n7
  creationTimestamp: "2024-01-01T15:15:31Z"
  finalizers:
  - flinkdeployments.flink.apache.org/finalizer
  generation: 5
  labels:
    app.kubernetes.io/managed-by: Helm
    environment: staging
  name: vc-realtime-ix7-staging
  namespace: flink-pipelines
  resourceVersion: "2211827076"
  uid: 328c9b41-ce8a-4165-9c9e-80141c9eb16d
spec:
  flinkConfiguration:
    env.java.opts: -Dlog4j2.formatMsgNoLookups=true
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3a://flink/ha/videocatalog-realtime
    kubernetes.jobmanager.entrypoint.args: -Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
      -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
    kubernetes.taskmanager.entrypoint.args: -Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
      -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
    metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    metrics.reporter.dghttp.tags: app:videocatalog-realtime,env:staging
    metrics.scope.jm: flink.jm
    metrics.scope.jm.job: flink.jm.job
    metrics.scope.operator: flink.operator
    metrics.scope.task: flink.task
    metrics.scope.tm: flink.tm
    metrics.scope.tm.job: flink.tm.job
    s3.endpoint: http://flink-minio-svc:9000
    s3.path-style-access: "true"
    state.backend.type: hashmap
    state.checkpoint-storage: filesystem
    state.checkpoints.dir: s3a://flink/recommender/videocatalog/externalized-checkpoints
    state.savepoints.dir: s3a://flink/recommender/videocatalog/savepoints
    taskmanager.numberOfTaskSlots: "4"
    web.timeout: "100000"
    web.upload.dir: /opt/flink
  flinkVersion: v1_16
  image: quay.io/mycompany/data-octopus-flink:a64370c
  imagePullPolicy: Always
  ingress:
    annotations:
      external-dns.alpha.kubernetes.io/hostname: vc-realtime-ix7-staging.mydomain.com
      external-dns.alpha.kubernetes.io/target: ****
      external-dns.alpha.kubernetes.io/ttl: "120"
      nginx.ingress.kubernetes.io/whitelist-source-range: *****
    className: nginx-priv
    template: vc-realtime-ix7-staging.mydomain.com
  job:
    args:
    - --oplogPubsubSubscription=projects/my-company/subscriptions/oplog-low-latency.aerospike
    - --runner=FlinkRunner
    - --streaming=true
    - --attachedMode=false
    - --checkpointingInterval=60000
    - --latencyTrackingInterval=60000
    entryClass: com.mycompany.reco_videocatalog.VideoCatalogWriteJob
    jarURI: local:///opt/flink/flink-web-upload/data-octopus-bundled.jar
    parallelism: 8
    state: running
    upgradeMode: savepoint
  jobManager:
    replicas: 1
    resource:
      cpu: 0.5
      memory: 2g
  podTemplate:
    apiVersion: v1
    kind: Pod
    spec:
      containers:
      - env:
        - name: DATADOG_API_KEY
          valueFrom:
            secretKeyRef:
              key: datadogApiKey
              name: data-octopus
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /var/secrets/google/google-credentials
        - name: MINIO_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              key: minioAccessKey
              name: data-octopus
        - name: MINIO_SECRET_KEY
          valueFrom:
            secretKeyRef:
              key: minioSecretKey
              name: data-octopus
        name: flink-main-container
        volumeMounts:
        - mountPath: /var/secrets/google
          name: gcp-serviceaccount
        - mountPath: /var/secrets/certs
          name: mycompany-ca-cert
      imagePullSecrets:
      - name: mycompany-puller-pull-secret
      volumes:
      - name: gcp-serviceaccount
        secret:
          items:
          - key: google-credentials
            path: google-credentials
          secretName: data-octopus
      - name: mycompany-ca-cert
        secret:
          items:
          - key: ca.crt
            path: ca.crt
          secretName: mycompany-ca-cert
  serviceAccount: flink
  taskManager:
    resource:

      cpu: 2
      memory: 4g

Here is the screenshot from Google Cloud Console showing that messages are never acked :

Screenshot 2024-01-03 at 09 41 22

Thank you for your help :)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions