package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/kafka/streams/integration/JoinIntegrationTest.class */
public class JoinIntegrationTest {

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private static ZkUtils zkUtils;
    private static final String APP_ID = "join-integration-test";
    private static final String INPUT_TOPIC_1 = "inputTopicLeft";
    private static final String INPUT_TOPIC_2 = "inputTopicRight";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final Properties PRODUCER_CONFIG;
    private static final Properties RESULT_CONSUMER_CONFIG;
    private static final Properties STREAMS_CONFIG;
    private KStreamBuilder builder;
    private KStream<Long, String> leftStream;
    private KStream<Long, String> rightStream;
    private KTable<Long, String> leftTable;
    private KTable<Long, String> rightTable;
    private final List<Input<String>> input = Arrays.asList(new Input(INPUT_TOPIC_1, (String) null), new Input(INPUT_TOPIC_2, (String) null), new Input(INPUT_TOPIC_1, "A"), new Input(INPUT_TOPIC_2, "a"), new Input(INPUT_TOPIC_1, "B"), new Input(INPUT_TOPIC_2, "b"), new Input(INPUT_TOPIC_1, (String) null), new Input(INPUT_TOPIC_2, (String) null), new Input(INPUT_TOPIC_1, "C"), new Input(INPUT_TOPIC_2, "c"), new Input(INPUT_TOPIC_2, (String) null), new Input(INPUT_TOPIC_1, (String) null), new Input(INPUT_TOPIC_2, (String) null), new Input(INPUT_TOPIC_2, "d"), new Input(INPUT_TOPIC_1, "D"));
    private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() { // from class: org.apache.kafka.streams.integration.JoinIntegrationTest.1
        public String apply(String str, String str2) {
            return str + "-" + str2;
        }
    };
    private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/JoinIntegrationTest$Input.class */
    public final class Input<V> {
        String topic;
        KeyValue<Long, V> record;
        private final long anyUniqueKey = 0;

        Input(String str, V v) {
            this.topic = str;
            this.record = KeyValue.pair(0L, v);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/JoinIntegrationTest$TopicsGotDeletedCondition.class */
    private final class TopicsGotDeletedCondition implements TestCondition {
        private TopicsGotDeletedCondition() {
        }

        public boolean conditionMet() {
            HashSet hashSet = new HashSet();
            hashSet.addAll(JavaConversions.seqAsJavaList(JoinIntegrationTest.zkUtils.getAllTopics()));
            return (hashSet.contains(JoinIntegrationTest.INPUT_TOPIC_1) || hashSet.contains(JoinIntegrationTest.INPUT_TOPIC_2) || hashSet.contains(JoinIntegrationTest.OUTPUT_TOPIC)) ? false : true;
        }
    }

    @BeforeClass
    public static void setupConfigsAndUtils() throws Exception {
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", 0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        RESULT_CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        RESULT_CONSUMER_CONFIG.put("group.id", "join-integration-test-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        STREAMS_CONFIG.put("key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("cache.max.bytes.buffering", 0);
        zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), EmbeddedKafkaCluster.TOPIC_CREATION_TIMEOUT, EmbeddedKafkaCluster.TOPIC_CREATION_TIMEOUT, JaasUtils.isZkSecurityEnabled());
    }

    @AfterClass
    public static void release() {
        if (zkUtils != null) {
            zkUtils.close();
        }
    }

    @Before
    public void prepareTopology() throws Exception {
        CLUSTER.createTopic(INPUT_TOPIC_1);
        CLUSTER.createTopic(INPUT_TOPIC_2);
        CLUSTER.createTopic(OUTPUT_TOPIC);
        this.builder = new KStreamBuilder();
        this.leftTable = this.builder.table(INPUT_TOPIC_1, "leftTable");
        this.rightTable = this.builder.table(INPUT_TOPIC_2, "rightTable");
        this.leftStream = this.leftTable.toStream();
        this.rightStream = this.rightTable.toStream();
    }

    @After
    public void cleanup() throws Exception {
        CLUSTER.deleteTopic(INPUT_TOPIC_1);
        CLUSTER.deleteTopic(INPUT_TOPIC_2);
        CLUSTER.deleteTopic(OUTPUT_TOPIC);
        TestUtils.waitForCondition(this.topicsGotDeleted, 120000L, "Topics not deleted after 120 seconds.");
    }

    private void checkResult(String str, List<String> list) throws Exception {
        if (list != null) {
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, str, list.size(), IntegrationTestUtils.DEFAULT_TIMEOUT), Is.is(list));
        }
    }

    /* JADX WARN: Type inference failed for: r0v19, types: [java.lang.String] */
    private void runTest(List<List<String>> list) throws Exception {
        if (!$assertionsDisabled && list.size() != this.input.size()) {
            throw new AssertionError();
        }
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder, STREAMS_CONFIG);
        try {
            kafkaStreams.start();
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<List<String>> it = list.iterator();
            for (Input<String> input : this.input) {
                ?? r0 = input.topic;
                long j = currentTimeMillis + 1;
                currentTimeMillis = r0;
                IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(r0, Collections.singleton(input.record), PRODUCER_CONFIG, Long.valueOf(j));
                checkResult(OUTPUT_TOPIC, it.next());
            }
        } finally {
            kafkaStreams.close();
        }
    }

    @Test
    public void testInnerKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KStream-KStream");
        List<List<String>> asList = Arrays.asList(null, null, null, Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of(10000L)).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testLeftKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KStream-KStream");
        List<List<String>> asList = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.leftJoin(this.rightStream, this.valueJoiner, JoinWindows.of(10000L)).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testOuterKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-outer-KStream-KStream");
        List<List<String>> asList = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.outerJoin(this.rightStream, this.valueJoiner, JoinWindows.of(10000L)).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testInnerKStreamKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KStream-KTable");
        List<List<String>> asList = Arrays.asList(null, null, null, null, Collections.singletonList("B-a"), null, null, null, null, null, null, null, null, null, Collections.singletonList("D-d"));
        this.leftStream.join(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testLeftKStreamKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KStream-KTable");
        List<List<String>> asList = Arrays.asList(null, null, Collections.singletonList("A-null"), null, Collections.singletonList("B-a"), null, null, null, Collections.singletonList("C-null"), null, null, null, null, null, Collections.singletonList("D-d"));
        this.leftStream.leftJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testInnerKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KTable-KTable");
        List<List<String>> asList = Arrays.asList(null, null, null, Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList((String) null), null, null, Collections.singletonList("C-c"), Collections.singletonList((String) null), null, null, null, Collections.singletonList("D-d"));
        this.leftTable.join(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testLeftKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KTable-KTable");
        List<List<String>> asList = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList((String) null), null, Collections.singletonList("C-null"), Collections.singletonList("C-c"), Collections.singletonList("C-null"), Collections.singletonList((String) null), null, null, Collections.singletonList("D-d"));
        this.leftTable.leftJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    @Test
    public void testOuterKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-outer-KTable-KTable");
        List<List<String>> asList = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList("null-b"), Collections.singletonList((String) null), Collections.singletonList("C-null"), Collections.singletonList("C-c"), Collections.singletonList("C-null"), Collections.singletonList((String) null), null, Collections.singletonList("null-d"), Collections.singletonList("D-d"));
        this.leftTable.outerJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        runTest(asList);
    }

    static {
        $assertionsDisabled = !JoinIntegrationTest.class.desiredAssertionStatus();
        CLUSTER = new EmbeddedKafkaCluster(1);
        zkUtils = null;
        PRODUCER_CONFIG = new Properties();
        RESULT_CONSUMER_CONFIG = new Properties();
        STREAMS_CONFIG = new Properties();
    }
}
