package org.apache.beam.examples.complete.kafkatopubsub;

import com.google.auth.Credentials;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.examples.complete.kafkatopubsub.transforms.FormatTransform;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PubSubEmulatorContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsubE2ETest.class */
public class KafkaToPubsubE2ETest {
    private static final String PUBSUB_MESSAGE = "test pubsub message";
    private static final String KAFKA_TOPIC_NAME = "messages-topic";
    private static final String PROJECT_ID = "try-kafka-pubsub";

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.fromOptions(OPTIONS);

    @Rule
    public final transient TestPubsub testPubsub = TestPubsub.fromOptions(OPTIONS);
    private static final PipelineOptions OPTIONS = TestPipeline.testingPipelineOptions();
    private static final String PUBSUB_EMULATOR_IMAGE = "gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators";

    @ClassRule
    public static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = new PubSubEmulatorContainer(DockerImageName.parse(PUBSUB_EMULATOR_IMAGE));
    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:5.4.3";

    @ClassRule
    public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME));

    @BeforeClass
    public static void beforeClass() throws Exception {
        Credentials credential = NoopCredentialFactory.fromOptions(OPTIONS).getCredential();
        OPTIONS.as(DirectOptions.class).setBlockOnRun(false);
        OPTIONS.as(GcpOptions.class).setGcpCredential(credential);
        OPTIONS.as(GcpOptions.class).setProject(PROJECT_ID);
        OPTIONS.as(PubsubOptions.class).setPubsubRootUrl("http://" + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
        OPTIONS.as(KafkaToPubsubOptions.class).setOutputFormat(FormatTransform.FORMAT.PUBSUB);
        OPTIONS.as(KafkaToPubsubOptions.class).setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers());
        OPTIONS.as(KafkaToPubsubOptions.class).setInputTopics(KAFKA_TOPIC_NAME);
        OPTIONS.as(KafkaToPubsubOptions.class).setKafkaConsumerConfig("auto.offset.reset=earliest");
    }

    @Before
    public void setUp() {
        OPTIONS.as(KafkaToPubsubOptions.class).setOutputTopic(this.testPubsub.topicPath().getPath());
    }

    @Test
    public void testKafkaToPubsubE2E() throws Exception {
        PipelineResult run = KafkaToPubsub.run(this.pipeline, OPTIONS.as(KafkaToPubsubOptions.class));
        sendKafkaMessage();
        this.testPubsub.assertThatTopicEventuallyReceives(new Matcher[]{Matchers.hasProperty("payload", Matchers.equalTo(PUBSUB_MESSAGE.getBytes(StandardCharsets.UTF_8)))}).waitForUpTo(Duration.standardMinutes(1L));
        try {
            run.cancel();
        } catch (UnsupportedOperationException e) {
            throw new AssertionError("Could not stop pipeline.", e);
        }
    }

    private void sendKafkaMessage() {
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(ImmutableMap.of("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers(), "client.id", UUID.randomUUID().toString()), new StringSerializer(), new StringSerializer());
            Throwable th = null;
            try {
                kafkaProducer.send(new ProducerRecord(KAFKA_TOPIC_NAME, "testcontainers", PUBSUB_MESSAGE)).get();
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaProducer.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Something went wrong in kafka producer", e);
        }
    }
}
