package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.function.Predicate;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.class */
public class KTableKTableForeignKeyJoinDistributedTest {
    private static final int NUM_BROKERS = 1;
    private static final String LEFT_TABLE = "left_table";
    private static final String RIGHT_TABLE = "right_table";
    private static final String OUTPUT = "output-topic";
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final String INPUT_TOPIC = "input-topic";
    private KafkaStreams client1;
    private KafkaStreams client2;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();
    private volatile boolean client1IsOk = false;
    private volatile boolean client2IsOk = false;

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setupTopics() throws InterruptedException {
        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
        CLUSTER.createTopic(OUTPUT, 11, 1);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        List asList = Arrays.asList(new KeyValue("lhsValue1", "lhsValue1|rhs1"), new KeyValue("lhsValue2", "lhsValue2|rhs2"), new KeyValue("lhsValue3", "lhsValue3|rhs3"), new KeyValue("lhsValue4", "lhsValue4|rhs4"));
        List asList2 = Arrays.asList(new KeyValue("rhs1", "rhsValue1"), new KeyValue("rhs2", "rhsValue2"), new KeyValue("rhs3", "rhsValue3"));
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, asList, properties, CLUSTER.time);
        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, asList2, properties, CLUSTER.time);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-distributed-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @After
    public void after() {
        this.client1.close();
        this.client2.close();
        IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, this.client1);
        IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, this.client2);
    }

    public Properties getStreamsConfiguration() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.put("application.id", "app-" + safeUniqueTestName);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        properties.put("topology.optimization", "all");
        return properties;
    }

    private void configureBuilder(StreamsBuilder streamsBuilder) {
        streamsBuilder.table(LEFT_TABLE).join(streamsBuilder.table(RIGHT_TABLE), str -> {
            return str.split("\\|")[1];
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }).toStream().to(OUTPUT);
    }

    @Test
    public void shouldBeInitializedWithDefaultSerde() throws Exception {
        Properties streamsConfiguration = getStreamsConfiguration();
        Properties streamsConfiguration2 = getStreamsConfiguration();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        configureBuilder(streamsBuilder);
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        configureBuilder(streamsBuilder2);
        createClients(streamsBuilder.build(streamsConfiguration), streamsConfiguration, streamsBuilder2.build(streamsConfiguration2), streamsConfiguration2);
        setStateListenersForVerification(threadMetadata -> {
            return !threadMetadata.activeTasks().isEmpty();
        });
        startClients();
        waitUntilBothClientAreOK("At least one client did not reach state RUNNING with active tasks");
        HashSet hashSet = new HashSet();
        hashSet.add(new KeyValue("lhsValue1", "(lhsValue1|rhs1,rhsValue1)"));
        hashSet.add(new KeyValue("lhsValue2", "(lhsValue2|rhs2,rhsValue2)"));
        hashSet.add(new KeyValue("lhsValue3", "(lhsValue3|rhs3,rhsValue3)"));
        Assert.assertEquals(hashSet, new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, hashSet.size())));
        Assert.assertEquals(KafkaStreams.State.RUNNING, this.client1.state());
        Assert.assertEquals(KafkaStreams.State.RUNNING, this.client2.state());
    }

    private void createClients(Topology topology, Properties properties, Topology topology2, Properties properties2) {
        this.client1 = new KafkaStreams(topology, properties);
        this.client2 = new KafkaStreams(topology2, properties2);
    }

    private void setStateListenersForVerification(Predicate<ThreadMetadata> predicate) {
        this.client1.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && this.client1.metadataForLocalThreads().stream().allMatch(predicate)) {
                this.client1IsOk = true;
            }
        });
        this.client2.setStateListener((state3, state4) -> {
            if (state3 == KafkaStreams.State.RUNNING && this.client2.metadataForLocalThreads().stream().allMatch(predicate)) {
                this.client2IsOk = true;
            }
        });
    }

    private void startClients() {
        this.client1.start();
        this.client2.start();
    }

    private void waitUntilBothClientAreOK(String str) throws Exception {
        TestUtils.waitForCondition(() -> {
            return this.client1IsOk && this.client2IsOk;
        }, 30000L, str + ": Client 1 is " + (!this.client1IsOk ? "NOT " : "") + "OK, client 2 is " + (!this.client2IsOk ? "NOT " : "") + "OK.");
    }
}
