package org.apache.kafka.streams.tests;

import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.class */
public class StreamsBrokerDownResilienceTest {
    private static final int KEY = 0;
    private static final int VALUE = 1;
    private static final String SOURCE_TOPIC_1 = "streamsResilienceSource";
    private static final String SINK_TOPIC = "streamsResilienceSink";

    public static void main(String[] strArr) throws IOException {
        if (strArr.length < 2) {
            System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + strArr.length + " parameter");
            System.exit(VALUE);
        }
        System.out.println("StreamsTest instance started");
        String str = strArr[KEY];
        String str2 = strArr[VALUE];
        Properties loadProps = Utils.loadProps(str);
        if (loadProps.getProperty("bootstrap.servers") == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            System.exit(VALUE);
        }
        loadProps.put("application.id", "kafka-streams-resilience");
        loadProps.put("default.key.serde", Serdes.String().getClass());
        loadProps.put("default.value.serde", Serdes.String().getClass());
        loadProps.put("commit.interval.ms", 100);
        if (str2 != null && !str2.equalsIgnoreCase("none")) {
            Map<String, String> updatedConfigs = updatedConfigs(str2);
            System.out.println("Updating configs with " + updatedConfigs);
            loadProps.putAll(updatedConfigs);
        }
        if (!confirmCorrectConfigs(loadProps)) {
            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s %s %s %s", StreamsConfig.consumerPrefix("max.poll.interval.ms"), StreamsConfig.producerPrefix("retries"), StreamsConfig.producerPrefix("request.timeout.ms"), StreamsConfig.producerPrefix("max.block.ms")));
            System.exit(VALUE);
        }
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Serde String = Serdes.String();
        streamsBuilder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(String, String)).peek(new ForeachAction<String, String>() { // from class: org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest.1
            int messagesProcessed = StreamsBrokerDownResilienceTest.KEY;

            public void apply(String str3, String str4) {
                System.out.println("received key " + str3 + " and value " + str4);
                this.messagesProcessed += StreamsBrokerDownResilienceTest.VALUE;
                System.out.println("processed" + this.messagesProcessed + "messages");
                System.out.flush();
            }
        }).to(SINK_TOPIC);
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), loadProps);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                System.err.println("FATAL: An unexpected exception " + th);
                System.err.flush();
                kafkaStreams.close(Duration.ofSeconds(30L));
            }
        });
        System.out.println("Start Kafka Streams");
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest.3
            @Override // java.lang.Runnable
            public void run() {
                kafkaStreams.close(Duration.ofSeconds(30L));
                System.out.println("Complete shutdown of streams resilience test app now");
                System.out.flush();
            }
        }));
    }

    private static boolean confirmCorrectConfigs(Properties properties) {
        return properties.containsKey(StreamsConfig.consumerPrefix("max.poll.interval.ms")) && properties.containsKey(StreamsConfig.producerPrefix("retries")) && properties.containsKey(StreamsConfig.producerPrefix("request.timeout.ms")) && properties.containsKey(StreamsConfig.producerPrefix("max.block.ms"));
    }

    private static Map<String, String> updatedConfigs(String str) {
        String[] split = str.split(",");
        HashMap hashMap = new HashMap();
        int length = split.length;
        for (int i = KEY; i < length; i += VALUE) {
            String[] split2 = split[i].split("=");
            hashMap.put(split2[KEY], split2[VALUE]);
        }
        return hashMap;
    }
}
