package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.class */
public class GlobalKTableIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalTableTopic;
    private String streamTopic;
    private GlobalKTable<Long, String> globalTable;
    private KStream<String, Long> stream;
    private MockApiProcessorSupplier<String, String, Void, Void> supplier;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final MockTime mockTime = CLUSTER.time;
    private final KeyValueMapper<String, Long, Long> keyMapper = (str, l) -> {
        return l;
    };
    private final ValueJoiner<Long, String, String> joiner = (l, str) -> {
        return l + "+" + str;
    };
    private final String globalStore = "globalStore";

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() throws Exception {
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100L);
        this.globalTable = this.builder.globalTable(this.globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as("globalStore").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        this.stream = this.builder.stream(this.streamTopic, Consumed.with(Serdes.String(), Serdes.Long()));
        this.supplier = new MockApiProcessorSupplier<>();
    }

    @After
    public void whenShuttingDown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
        this.stream.leftJoin(this.globalTable, this.keyMapper, this.joiner).process(this.supplier, new String[0]);
        produceInitialGlobalTableValues();
        startStreams();
        long milliseconds = this.mockTime.milliseconds();
        produceTopicValues(this.streamTopic);
        HashMap hashMap = new HashMap();
        hashMap.put("a", ValueAndTimestamp.make("1+A", milliseconds));
        hashMap.put("b", ValueAndTimestamp.make("2+B", milliseconds + 1));
        hashMap.put("c", ValueAndTimestamp.make("3+C", milliseconds + 2));
        hashMap.put("d", ValueAndTimestamp.make("4+D", milliseconds + 3));
        hashMap.put("e", ValueAndTimestamp.make("5+null", milliseconds + 4));
        TestUtils.waitForCondition(() -> {
            if (this.supplier.capturedProcessorsCount() < 2) {
                return false;
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(this.supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
            hashMap2.putAll(this.supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
            return hashMap2.equals(hashMap);
        }, 30000L, "waiting for initial values");
        long milliseconds2 = this.mockTime.milliseconds();
        produceGlobalTableValues();
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(1L, "F");
        hashMap2.put(2L, "G");
        hashMap2.put(3L, "H");
        hashMap2.put(4L, "I");
        hashMap2.put(5L, "J");
        HashMap hashMap3 = new HashMap();
        TestUtils.waitForCondition(() -> {
            hashMap3.clear();
            KeyValueIterator all = readOnlyKeyValueStore.all();
            Throwable th = null;
            try {
                try {
                    all.forEachRemaining(keyValue -> {
                    });
                    if (all != null) {
                        if (0 != 0) {
                            try {
                                all.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            all.close();
                        }
                    }
                    return hashMap3.equals(hashMap2);
                } finally {
                }
            } catch (Throwable th3) {
                if (all != null) {
                    if (th != null) {
                        try {
                            all.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        all.close();
                    }
                }
                throw th3;
            }
        }, 30000L, () -> {
            return "waiting for data in replicated store\n  expected: " + hashMap2 + "\n  received: " + hashMap3;
        });
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore2);
        MatcherAssert.assertThat(readOnlyKeyValueStore2.get(5L), IsEqual.equalTo(ValueAndTimestamp.make("J", milliseconds2 + 4)));
        long milliseconds3 = this.mockTime.milliseconds();
        produceTopicValues(this.streamTopic);
        hashMap.put("a", ValueAndTimestamp.make("1+F", milliseconds3));
        hashMap.put("b", ValueAndTimestamp.make("2+G", milliseconds3 + 1));
        hashMap.put("c", ValueAndTimestamp.make("3+H", milliseconds3 + 2));
        hashMap.put("d", ValueAndTimestamp.make("4+I", milliseconds3 + 3));
        hashMap.put("e", ValueAndTimestamp.make("5+J", milliseconds3 + 4));
        TestUtils.waitForCondition(() -> {
            if (this.supplier.capturedProcessorsCount() < 2) {
                return false;
            }
            HashMap hashMap4 = new HashMap();
            hashMap4.putAll(this.supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
            hashMap4.putAll(this.supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
            return hashMap4.equals(hashMap);
        }, 30000L, "waiting for final values");
    }

    @Test
    public void shouldKStreamGlobalKTableJoin() throws Exception {
        this.stream.join(this.globalTable, this.keyMapper, this.joiner).process(this.supplier, new String[0]);
        produceInitialGlobalTableValues();
        startStreams();
        long milliseconds = this.mockTime.milliseconds();
        produceTopicValues(this.streamTopic);
        HashMap hashMap = new HashMap();
        hashMap.put("a", ValueAndTimestamp.make("1+A", milliseconds));
        hashMap.put("b", ValueAndTimestamp.make("2+B", milliseconds + 1));
        hashMap.put("c", ValueAndTimestamp.make("3+C", milliseconds + 2));
        hashMap.put("d", ValueAndTimestamp.make("4+D", milliseconds + 3));
        TestUtils.waitForCondition(() -> {
            if (this.supplier.capturedProcessorsCount() < 2) {
                return false;
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(this.supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
            hashMap2.putAll(this.supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
            return hashMap2.equals(hashMap);
        }, 30000L, "waiting for initial values");
        long milliseconds2 = this.mockTime.milliseconds();
        produceGlobalTableValues();
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(1L, "F");
        hashMap2.put(2L, "G");
        hashMap2.put(3L, "H");
        hashMap2.put(4L, "I");
        hashMap2.put(5L, "J");
        HashMap hashMap3 = new HashMap();
        TestUtils.waitForCondition(() -> {
            hashMap3.clear();
            KeyValueIterator all = readOnlyKeyValueStore.all();
            Throwable th = null;
            try {
                try {
                    all.forEachRemaining(keyValue -> {
                    });
                    if (all != null) {
                        if (0 != 0) {
                            try {
                                all.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            all.close();
                        }
                    }
                    return hashMap3.equals(hashMap2);
                } finally {
                }
            } catch (Throwable th3) {
                if (all != null) {
                    if (th != null) {
                        try {
                            all.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        all.close();
                    }
                }
                throw th3;
            }
        }, 30000L, () -> {
            return "waiting for data in replicated store\n  expected: " + hashMap2 + "\n  received: " + hashMap3;
        });
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore2);
        MatcherAssert.assertThat(readOnlyKeyValueStore2.get(5L), IsEqual.equalTo(ValueAndTimestamp.make("J", milliseconds2 + 4)));
        long milliseconds3 = this.mockTime.milliseconds();
        produceTopicValues(this.streamTopic);
        hashMap.put("a", ValueAndTimestamp.make("1+F", milliseconds3));
        hashMap.put("b", ValueAndTimestamp.make("2+G", milliseconds3 + 1));
        hashMap.put("c", ValueAndTimestamp.make("3+H", milliseconds3 + 2));
        hashMap.put("d", ValueAndTimestamp.make("4+I", milliseconds3 + 3));
        hashMap.put("e", ValueAndTimestamp.make("5+J", milliseconds3 + 4));
        TestUtils.waitForCondition(() -> {
            if (this.supplier.capturedProcessorsCount() < 2) {
                return false;
            }
            HashMap hashMap4 = new HashMap();
            hashMap4.putAll(this.supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
            hashMap4.putAll(this.supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
            return hashMap4.equals(hashMap);
        }, 30000L, "waiting for final values");
    }

    @Test
    public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
        this.builder = new StreamsBuilder();
        this.globalTable = this.builder.globalTable(this.globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as(Stores.inMemoryKeyValueStore("globalStore")));
        produceInitialGlobalTableValues();
        startStreams();
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore);
        MatcherAssert.assertThat(Long.valueOf(readOnlyKeyValueStore.approximateNumEntries()), IsEqual.equalTo(4L));
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
        Assert.assertNotNull(readOnlyKeyValueStore2);
        MatcherAssert.assertThat(Long.valueOf(readOnlyKeyValueStore2.approximateNumEntries()), IsEqual.equalTo(4L));
        this.kafkaStreams.close();
        startStreams();
        MatcherAssert.assertThat(Long.valueOf(((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore())).approximateNumEntries()), IsEqual.equalTo(4L));
        MatcherAssert.assertThat(Long.valueOf(((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore())).approximateNumEntries()), IsEqual.equalTo(4L));
    }

    @Test
    public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
        this.builder = new StreamsBuilder();
        this.globalTable = this.builder.globalTable(this.globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as(Stores.inMemoryKeyValueStore("globalStore")));
        startStreams();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, Duration.ofSeconds(30L));
        this.kafkaStreams.close();
    }

    private void createTopics() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamTopic = "stream-" + safeUniqueTestName;
        this.globalTableTopic = "globalTable-" + safeUniqueTestName;
        CLUSTER.createTopics(this.streamTopic);
        CLUSTER.createTopic(this.globalTableTopic, 2, 1);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private void produceTopicValues(String str) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, Arrays.asList(new KeyValue("a", 1L), new KeyValue("b", 2L), new KeyValue("c", 3L), new KeyValue("d", 4L), new KeyValue("e", 5L)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
    }

    private void produceInitialGlobalTableValues() {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue(1L, "A"), new KeyValue(2L, "B"), new KeyValue(3L, "C"), new KeyValue(4L, "D")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), this.mockTime);
    }

    private void produceGlobalTableValues() {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue(1L, "F"), new KeyValue(2L, "G"), new KeyValue(3L, "H"), new KeyValue(4L, "I"), new KeyValue(5L, "J")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
    }
}
