/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class KTableKTableForeignKeyJoinDistributedTest {
    private static final int NUM_BROKERS = 1;
    private static final String LEFT_TABLE = "left_table";
    private static final String RIGHT_TABLE = "right_table";
    private static final String OUTPUT = "output-topic";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Properties CONSUMER_CONFIG = new Properties();
    @Rule
    public TestName testName = new TestName();
    private static final String INPUT_TOPIC = "input-topic";
    private KafkaStreams client1;
    private KafkaStreams client2;
    private volatile boolean client1IsOk = false;
    private volatile boolean client2IsOk = false;

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
    }

    @Before
    public void setupTopics() throws InterruptedException {
        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
        CLUSTER.createTopic(OUTPUT, 11, 1);
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerConfig.put("acks", "all");
        producerConfig.put("key.serializer", StringSerializer.class);
        producerConfig.put("value.serializer", StringSerializer.class);
        List leftTable = Arrays.asList(new KeyValue((Object)"lhsValue1", (Object)"lhsValue1|rhs1"), new KeyValue((Object)"lhsValue2", (Object)"lhsValue2|rhs2"), new KeyValue((Object)"lhsValue3", (Object)"lhsValue3|rhs3"), new KeyValue((Object)"lhsValue4", (Object)"lhsValue4|rhs4"));
        List rightTable = Arrays.asList(new KeyValue((Object)"rhs1", (Object)"rhsValue1"), new KeyValue((Object)"rhs2", (Object)"rhsValue2"), new KeyValue((Object)"rhs3", (Object)"rhsValue3"));
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTable, producerConfig, (Time)KTableKTableForeignKeyJoinDistributedTest.CLUSTER.time);
        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTable, producerConfig, (Time)KTableKTableForeignKeyJoinDistributedTest.CLUSTER.time);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-distributed-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @After
    public void after() {
        this.client1.close();
        this.client2.close();
        IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, this.client1);
        IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, this.client2);
    }

    public Properties getStreamsConfiguration() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "app-" + safeTestName);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("topology.optimization", "all");
        return streamsConfiguration;
    }

    private void configureBuilder(StreamsBuilder builder) {
        KTable left = builder.table(LEFT_TABLE);
        KTable right = builder.table(RIGHT_TABLE);
        Function<String, String> extractor = value -> value.split("\\|")[1];
        ValueJoiner joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
        KTable fkJoin = left.join(right, extractor, joiner);
        fkJoin.toStream().to(OUTPUT);
    }

    @Test
    public void shouldBeInitializedWithDefaultSerde() throws Exception {
        Properties streamsConfiguration1 = this.getStreamsConfiguration();
        Properties streamsConfiguration2 = this.getStreamsConfiguration();
        StreamsBuilder builder1 = new StreamsBuilder();
        this.configureBuilder(builder1);
        StreamsBuilder builder2 = new StreamsBuilder();
        this.configureBuilder(builder2);
        this.createClients(builder1.build(streamsConfiguration1), streamsConfiguration1, builder2.build(streamsConfiguration2), streamsConfiguration2);
        this.setStateListenersForVerification(thread -> !thread.activeTasks().isEmpty());
        this.startClients();
        this.waitUntilBothClientAreOK("At least one client did not reach state RUNNING with active tasks");
        HashSet<KeyValue> expectedResult = new HashSet<KeyValue>();
        expectedResult.add(new KeyValue((Object)"lhsValue1", (Object)"(lhsValue1|rhs1,rhsValue1)"));
        expectedResult.add(new KeyValue((Object)"lhsValue2", (Object)"(lhsValue2|rhs2,rhsValue2)"));
        expectedResult.add(new KeyValue((Object)"lhsValue3", (Object)"(lhsValue3|rhs3,rhsValue3)"));
        HashSet result = new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, expectedResult.size()));
        Assert.assertEquals(expectedResult, result);
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)this.client1.state());
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)this.client2.state());
    }

    private void createClients(Topology topology1, Properties streamsConfiguration1, Topology topology2, Properties streamsConfiguration2) {
        this.client1 = new KafkaStreams(topology1, streamsConfiguration1);
        this.client2 = new KafkaStreams(topology2, streamsConfiguration2);
    }

    private void setStateListenersForVerification(Predicate<ThreadMetadata> taskCondition) {
        this.client1.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && this.client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
                this.client1IsOk = true;
            }
        });
        this.client2.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && this.client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
                this.client2IsOk = true;
            }
        });
    }

    private void startClients() {
        this.client1.start();
        this.client2.start();
    }

    private void waitUntilBothClientAreOK(String message) throws Exception {
        TestUtils.waitForCondition(() -> this.client1IsOk && this.client2IsOk, (long)30000L, (String)(message + ": Client 1 is " + (!this.client1IsOk ? "NOT " : "") + "OK, client 2 is " + (!this.client2IsOk ? "NOT " : "") + "OK."));
    }
}

