package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.class */
public class KStreamKStreamIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String LEFT_STREAM = "leftStream";
    private static final String RIGHT_STREAM = "rightStream";
    private static final String OUTPUT = "output";
    private Properties streamsConfig;
    private KafkaStreams streams;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final MockTime MOCK_TIME = CLUSTER.time;
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final Properties PRODUCER_CONFIG = new Properties();

    @BeforeAll
    public static void startCluster() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(LEFT_STREAM, 4, 1);
        CLUSTER.createTopic(RIGHT_STREAM, 4, 1);
        CLUSTER.createTopic(OUTPUT, 4, 1);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "result-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws IOException {
        String path = TestUtils.tempDirectory().getPath();
        this.streamsConfig = getStreamsConfig(IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo));
        this.streamsConfig.put("state.dir", path);
    }

    @AfterEach
    public void after() throws IOException {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfig);
    }

    @Test
    public void shouldOuterJoin() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new KeyValue<>("Key-1", "value1=left-1a,value2=null"));
        hashSet.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
        hashSet.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
        hashSet.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
        verifyKStreamKStreamOuterJoin(hashSet);
    }

    private void verifyKStreamKStreamOuterJoin(Set<KeyValue<String, String>> set) throws Exception {
        this.streams = prepareTopology(this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(120L));
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        List asList = Arrays.asList(new KeyValue("Key-1", "left-1a"), new KeyValue("Key-2", "left-2a"), new KeyValue("Key-3", "left-3a"), new KeyValue("Key-4", "left-4a"));
        List asList2 = Arrays.asList(new KeyValue("Key-1", "left-1b"), new KeyValue("Key-2", "left-2b"), new KeyValue("Key-3", "left-3b"), new KeyValue("Key-4", "left-4b"));
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, asList, PRODUCER_CONFIG, MOCK_TIME);
        MOCK_TIME.sleep(10000L);
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, asList2, PRODUCER_CONFIG, MOCK_TIME);
        MatcherAssert.assertThat(set, IsEqual.equalTo(new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, set.size()))));
    }

    private Properties getStreamsConfig(String str) {
        Properties properties = new Properties();
        properties.put("application.id", "KStream-KStream-join" + str);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("auto.offset.reset", "earliest");
        properties.put("commit.interval.ms", 100L);
        properties.put("default.key.serde", Serdes.StringSerde.class);
        properties.put("default.value.serde", Serdes.StringSerde.class);
        return properties;
    }

    private static KafkaStreams prepareTopology(Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(LEFT_STREAM).outerJoin(streamsBuilder.stream(RIGHT_STREAM), (str, str2) -> {
            return "value1=" + str + ",value2=" + str2;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10L))).to(OUTPUT);
        return new KafkaStreams(streamsBuilder.build(properties), properties);
    }
}
