package org.apache.kafka.streams.tests;

import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.TestUtils;

/* loaded from: input_file:org/apache/kafka/streams/tests/BrokerCompatibilityTest.class */
public class BrokerCompatibilityTest {
    private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
    private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";

    public static void main(String[] strArr) throws Exception {
        System.out.println("StreamsTest instance started");
        String str = strArr.length > 0 ? strArr[0] : "localhost:9092";
        File file = new File(strArr.length > 1 ? strArr[1] : TestUtils.tempDirectory().getAbsolutePath());
        file.mkdir();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("application.id", "kafka-streams-system-test-broker-compatibility");
        properties.put("state.dir", file.toString());
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.serde", Serdes.String().getClass());
        properties.put("value.serde", Serdes.String().getClass());
        properties.put("commit.interval.ms", 100);
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{SOURCE_TOPIC}).to(SINK_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        System.out.println("start Kafka Streams");
        kafkaStreams.start();
        System.out.println("send data");
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", str);
        properties2.put("key.serializer", StringSerializer.class);
        properties2.put("value.serializer", StringSerializer.class);
        new KafkaProducer(properties2).send(new ProducerRecord(SOURCE_TOPIC, "key", "value"));
        System.out.println("wait for result");
        loopUntilRecordReceived(str);
        System.out.println("close Kafka Streams");
        kafkaStreams.close();
    }

    private static void loopUntilRecordReceived(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", "broker-compatibility-consumer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(SINK_TOPIC));
        while (true) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (((String) consumerRecord.key()).equals("key") && ((String) consumerRecord.value()).equals("value")) {
                    kafkaConsumer.close();
                    return;
                }
            }
        }
    }
}
