package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTestBase.class */
public abstract class KafkaTestBase extends TestLogger {
    protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
    protected static String brokerConnectionStrings;
    protected static Properties standardProps;
    protected static KafkaTestEnvironment kafkaServer;
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
    protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    protected static Properties secureProps = new Properties();

    @BeforeClass
    public static void prepare() throws Exception {
        prepare(true);
    }

    public static void prepare(boolean z) throws Exception {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        startClusters(false, z);
    }

    @AfterClass
    public static void shutDownServices() throws Exception {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Shut down KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        TestStreamEnvironment.unsetAsContext();
        shutdownClusters();
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    KafkaTestBase finished");
        LOG.info("-------------------------------------------------------------------------");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Configuration getFlinkConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
        configuration.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
        configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
        configuration.setString("metrics.reporter.my_reporter.class", JMXReporter.class.getName());
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void startClusters() throws Exception {
        startClusters(KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS));
    }

    protected static void startClusters(boolean z, boolean z2) throws Exception {
        startClusters(KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(z).setHideKafkaBehindProxy(z2));
    }

    protected static void startClusters(KafkaTestEnvironment.Config config) throws Exception {
        kafkaServer = constructKafkaTestEnvionment();
        LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
        kafkaServer.prepare(config);
        standardProps = kafkaServer.getStandardProperties();
        brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
        if (config.isSecureMode()) {
            if (!kafkaServer.isSecureRunSupported()) {
                throw new IllegalStateException("Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
            }
            secureProps = kafkaServer.getSecureProperties();
        }
    }

    protected static KafkaTestEnvironment constructKafkaTestEnvionment() throws Exception {
        return (KafkaTestEnvironment) InstantiationUtil.instantiate(Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void shutdownClusters() throws Exception {
        if (secureProps != null) {
            secureProps.clear();
        }
        if (kafkaServer != null) {
            kafkaServer.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void tryExecutePropagateExceptions(StreamExecutionEnvironment streamExecutionEnvironment, String str) throws Exception {
        try {
            streamExecutionEnvironment.execute(str);
        } catch (ProgramInvocationException | JobExecutionException e) {
            int i = 0;
            for (Throwable cause = e.getCause(); !(cause instanceof SuccessException); cause = cause.getCause()) {
                if (cause != null) {
                    int i2 = i;
                    i++;
                    if (i2 != 20) {
                    }
                }
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void createTestTopic(String str, int i, int i2) {
        kafkaServer.createTestTopic(str, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void deleteTestTopic(String str) {
        kafkaServer.deleteTestTopic(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertAtLeastOnceForTopic(Properties properties, String str, int i, Set<Integer> set, long j) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        HashSet hashSet = new HashSet();
        while (System.currentTimeMillis() < currentTimeMillis + j) {
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            Iterator it = kafkaServer.getAllRecordsFromTopic(properties, str, i, 100L).iterator();
            while (it.hasNext()) {
                hashSet.add(((ConsumerRecord) it.next()).value());
            }
            if (hashSet.containsAll(set)) {
                return;
            }
        }
        Assert.fail(String.format("Expected to contain all of: <%s>, but was: <%s>", set, hashSet));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertExactlyOnceForTopic(Properties properties, String str, int i, List<Integer> list) {
        assertExactlyOnceForTopic(properties, str, i, list, 30000L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertExactlyOnceForTopic(Properties properties, String str, int i, List<Integer> list, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties2.put("isolation.level", "read_committed");
        while (System.currentTimeMillis() < currentTimeMillis + j) {
            Iterator it = kafkaServer.getAllRecordsFromTopic(properties2, str, i, 1000L).iterator();
            while (it.hasNext()) {
                arrayList.add(((ConsumerRecord) it.next()).value());
            }
            if (arrayList.equals(list)) {
                return;
            }
            if (arrayList.size() > list.size()) {
                break;
            }
        }
        Assert.fail(String.format("Expected %s, but was: %s", formatElements(list), formatElements(arrayList)));
    }

    private String formatElements(List<Integer> list) {
        return list.size() > 50 ? String.format("number of elements: <%s>", Integer.valueOf(list.size())) : String.format("elements: <%s>", list);
    }
}
