/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public abstract class KafkaTestBase
extends TestLogger {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
    protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
    protected static final int NUM_TMS = 1;
    protected static final int TM_SLOTS = 8;
    protected static final int PARALLELISM = 8;
    protected static String brokerConnectionStrings;
    protected static Properties standardProps;
    protected static LocalFlinkMiniCluster flink;
    protected static FiniteDuration timeout;
    protected static KafkaTestEnvironment kafkaServer;
    @ClassRule
    public static TemporaryFolder tempFolder;
    protected static Properties secureProps;

    @BeforeClass
    public static void prepare() throws ClassNotFoundException {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        KafkaTestBase.startClusters(false);
        TestStreamEnvironment.setAsContext((LocalFlinkMiniCluster)flink, (int)8);
    }

    @AfterClass
    public static void shutDownServices() {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Shut down KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        TestStreamEnvironment.unsetAsContext();
        KafkaTestBase.shutdownClusters();
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    KafkaTestBase finished");
        LOG.info("-------------------------------------------------------------------------");
    }

    protected static Configuration getFlinkConfiguration() {
        Configuration flinkConfig = new Configuration();
        flinkConfig.setInteger("local.number-taskmanager", 1);
        flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
        flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
        flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
        flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
        flinkConfig.setString("metrics.reporter.my_reporter.class", JMXReporter.class.getName());
        return flinkConfig;
    }

    protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
        Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
        kafkaServer = (KafkaTestEnvironment)InstantiationUtil.instantiate(clazz);
        LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
        kafkaServer.prepare(3, secureMode);
        standardProps = kafkaServer.getStandardProperties();
        brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
        if (secureMode) {
            if (!kafkaServer.isSecureRunSupported()) {
                throw new IllegalStateException("Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
            }
            secureProps = kafkaServer.getSecureProperties();
        }
        flink = new LocalFlinkMiniCluster(KafkaTestBase.getFlinkConfiguration(), false);
        flink.start();
    }

    protected static void shutdownClusters() {
        if (flink != null) {
            flink.shutdown();
        }
        if (secureProps != null) {
            secureProps.clear();
        }
        kafkaServer.shutdown();
    }

    protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
        try {
            see.execute(name);
        }
        catch (ProgramInvocationException | JobExecutionException root) {
            Throwable cause = root.getCause();
            int depth = 0;
            while (!(cause instanceof SuccessException)) {
                if (cause == null || depth++ == 20) {
                    throw root;
                }
                cause = cause.getCause();
            }
        }
    }

    protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
        kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
    }

    protected static void deleteTestTopic(String topic) {
        kafkaServer.deleteTestTopic(topic);
    }

    static {
        timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        tempFolder = new TemporaryFolder();
        secureProps = new Properties();
    }
}

