package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaTableTestBase.class */
public abstract class KafkaTableTestBase extends AbstractTestBase {
    private static final int zkTimeoutMills = 30000;
    protected StreamExecutionEnvironment env;
    protected StreamTableEnvironment tEnv;
    private final Timer loggingTimer = new Timer("Debug Logging Timer");
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);
    private static final Network NETWORK = Network.newNetwork();
    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";

    @RegisterExtension
    public static final KafkaContainerExtension KAFKA_CONTAINER = new KafkaContainerExtension(DockerImageName.parse("confluentinc/cp-kafka:7.2.2")) { // from class: org.apache.paimon.flink.kafka.KafkaTableTestBase.1
        protected void doStart() {
            super.doStart();
            if (KafkaTableTestBase.LOG.isInfoEnabled()) {
                followOutput(new Slf4jLogConsumer(KafkaTableTestBase.LOG));
            }
        }
    }.withEmbeddedZookeeper().withNetwork(NETWORK).withNetworkAliases(new String[]{INTER_CONTAINER_KAFKA_ALIAS}).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2).toMillis())).withEnv("KAFKA_LOG_RETENTION_MS", "-1");

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaTableTestBase$KafkaContainerExtension.class */
    public static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback {
        private KafkaContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) throws Exception {
            doStart();
        }

        public void afterAll(ExtensionContext extensionContext) throws Exception {
            close();
        }
    }

    @BeforeEach
    public void setup() {
        this.env = streamExecutionEnvironmentBuilder().streamingMode().build();
        this.tEnv = StreamTableEnvironment.create(this.env);
        this.tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
        scheduleTimeoutLogger(Duration.ofSeconds(30L), () -> {
            Map<String, TopicDescription> describeExternalTopics = describeExternalTopics();
            LOG.info("Current existing topics: {}", describeExternalTopics.keySet());
            logTopicPartitionStatus(describeExternalTopics);
        });
    }

    @AfterEach
    public void after() throws ExecutionException, InterruptedException {
        cancelTimeoutLogger();
        deleteTopics();
    }

    public static Properties getStandardProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        properties.put("group.id", "flink-tests");
        properties.put("enable.auto.commit", false);
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.partition.fetch.bytes", 256);
        properties.put("zookeeper.session.timeout.ms", Integer.valueOf(zkTimeoutMills));
        properties.put("zookeeper.connection.timeout.ms", Integer.valueOf(zkTimeoutMills));
        return properties;
    }

    public static String getBootstrapServers() {
        return KAFKA_CONTAINER.getBootstrapServers();
    }

    protected boolean topicExists(String str) {
        return describeExternalTopics().containsKey(str);
    }

    public static void createTopicIfNotExists(String str, int i) {
        try {
            AdminClient create = AdminClient.create(getStandardProps());
            Throwable th = null;
            try {
                try {
                    if (!((Set) create.listTopics().names().get()).contains(str)) {
                        create.createTopics(Collections.singleton(new NewTopic(str, Optional.of(Integer.valueOf(i)), Optional.empty()))).all().get();
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Exception e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(String.format("Failed to create Kafka topic %s", str), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteTopicIfExists(String str) {
        try {
            AdminClient create = AdminClient.create(getStandardProps());
            Throwable th = null;
            try {
                try {
                    if (((Set) create.listTopics().names().get()).contains(str)) {
                        create.deleteTopics(Collections.singleton(str)).all().get();
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw new RuntimeException(String.format("Failed to drop Kafka topic %s", str), e);
            }
        }
    }

    private void deleteTopics() throws ExecutionException, InterruptedException {
        AdminClient create = AdminClient.create(getStandardProps());
        create.deleteTopics((Collection) create.listTopics().names().get()).all().get();
    }

    private void scheduleTimeoutLogger(Duration duration, final Runnable runnable) {
        this.loggingTimer.schedule(new TimerTask() { // from class: org.apache.paimon.flink.kafka.KafkaTableTestBase.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to execute logging action", e);
                }
            }
        }, 0L, duration.toMillis());
    }

    private void cancelTimeoutLogger() {
        this.loggingTimer.cancel();
    }

    private Map<String, TopicDescription> describeExternalTopics() {
        try {
            AdminClient create = AdminClient.create(getStandardProps());
            Throwable th = null;
            try {
                Map<String, TopicDescription> map = (Map) create.describeTopics((List) ((Collection) create.listTopics().listings().get()).stream().filter(topicListing -> {
                    return !topicListing.isInternal();
                }).map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList())).all().get();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return map;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to list Kafka topics", e);
        }
    }

    private void logTopicPartitionStatus(Map<String, TopicDescription> map) {
        Properties standardProps = getStandardProps();
        standardProps.setProperty("group.id", "flink-tests-debugging");
        standardProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        standardProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(standardProps);
        ArrayList arrayList = new ArrayList();
        map.forEach((str, topicDescription) -> {
            topicDescription.partitions().forEach(topicPartitionInfo -> {
                arrayList.add(new TopicPartition(str, topicPartitionInfo.partition()));
            });
        });
        Map beginningOffsets = kafkaConsumer.beginningOffsets(arrayList);
        Map endOffsets = kafkaConsumer.endOffsets(arrayList);
        arrayList.forEach(topicPartition -> {
            LOG.info("TopicPartition \"{}\": starting offset: {}, stopping offset: {}", new Object[]{topicPartition, beginningOffsets.get(topicPartition), endOffsets.get(topicPartition)});
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkTopicExists(String str, int i, int i2) {
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    TopicDescription topicDescription = (TopicDescription) ((Map) createAdminClient.describeTopics(Collections.singleton(str)).allTopicNames().get(10L, TimeUnit.SECONDS)).get(str);
                    Assertions.assertThat(topicDescription.partitions().size()).isEqualTo(i);
                    Assertions.assertThat(((TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size()).isEqualTo(i2);
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Exception e) {
            Fail.fail(e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkTopicNotExist(String str) {
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    Assertions.assertThat((Map) createAdminClient.describeTopics(Collections.emptyList()).allTopicNames().get()).doesNotContainKey(str);
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Exception e) {
            Fail.fail(e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getBootstrapServers());
        return AdminClient.create(properties);
    }
}
