/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Properties;
import java.util.function.Predicate;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Category(value={IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String INPUT_TOPIC = "input-topic";
    private KafkaStreams client1;
    private KafkaStreams client2;
    private volatile boolean client1IsOk = false;
    private volatile boolean client2IsOk = false;

    @BeforeAll
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @AfterEach
    public void after() {
        this.client1.close();
        this.client2.close();
    }

    private Properties streamsConfiguration(TestInfo testInfo) {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), testInfo);
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "app-" + safeTestName);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("num.standby.replicas", (Object)1);
        return streamsConfiguration;
    }

    @Test
    public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(TestInfo testInfo) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        String stateStoreName = "myTransformState";
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"myTransformState"), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withLoggingDisabled();
        builder.addStateStore(keyValueStoreBuilder);
        builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){

            public void init(ProcessorContext context) {
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                return null;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"});
        Topology topology = builder.build();
        this.createClients(topology, this.streamsConfiguration(testInfo), topology, this.streamsConfiguration(testInfo));
        this.setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
        this.startClients();
        this.waitUntilBothClientAreOK("At least one client did not reach state RUNNING with active tasks but no stand-by tasks");
    }

    @Test
    public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(TestInfo testInfo) throws Exception {
        Properties streamsConfiguration1 = this.streamsConfiguration(testInfo);
        streamsConfiguration1.put("topology.optimization", "all");
        Properties streamsConfiguration2 = this.streamsConfiguration(testInfo);
        streamsConfiguration2.put("topology.optimization", "all");
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)"source-table"));
        this.createClients(builder.build(streamsConfiguration1), streamsConfiguration1, builder.build(streamsConfiguration2), streamsConfiguration2);
        this.setStateListenersForVerification(thread -> !thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
        this.startClients();
        this.waitUntilBothClientAreOK("At least one client did not reach state RUNNING with active tasks and stand-by tasks");
    }

    private void createClients(Topology topology1, Properties streamsConfiguration1, Topology topology2, Properties streamsConfiguration2) {
        this.client1 = new KafkaStreams(topology1, streamsConfiguration1);
        this.client2 = new KafkaStreams(topology2, streamsConfiguration2);
    }

    private void setStateListenersForVerification(Predicate<ThreadMetadata> taskCondition) {
        this.client1.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && this.client1.metadataForLocalThreads().stream().allMatch(taskCondition)) {
                this.client1IsOk = true;
            }
        });
        this.client2.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && this.client2.metadataForLocalThreads().stream().allMatch(taskCondition)) {
                this.client2IsOk = true;
            }
        });
    }

    private void startClients() {
        this.client1.start();
        this.client2.start();
    }

    private void waitUntilBothClientAreOK(String message) throws Exception {
        TestUtils.waitForCondition(() -> this.client1IsOk && this.client2IsOk, (long)30000L, (String)(message + ": Client 1 is " + (!this.client1IsOk ? "NOT " : "") + "OK, client 2 is " + (!this.client2IsOk ? "NOT " : "") + "OK."));
    }
}

