package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.class */
public class GlobalKTableIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private static volatile int testNo;
    private KStreamBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalOne;
    private String inputStream;
    private String inputTable;
    private GlobalKTable<Long, String> globalTable;
    private KStream<String, Long> stream;
    private KTable<String, Long> table;
    private ForeachAction<String, String> foreachAction;
    private final MockTime mockTime = CLUSTER.time;
    private final KeyValueMapper<String, Long, Long> keyMapper = new KeyValueMapper<String, Long, Long>() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.1
        public Long apply(String str, Long l) {
            return l;
        }
    };
    private final ValueJoiner<Long, String, String> joiner = new ValueJoiner<Long, String, String>() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.2
        public String apply(Long l, String str) {
            return l + "+" + str;
        }
    };
    private final String globalStore = "globalStore";
    final Map<String, String> results = new HashMap();

    @Before
    public void before() throws InterruptedException {
        testNo += NUM_BROKERS;
        this.builder = new KStreamBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "globalOne-table-test-" + testNo);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.globalTable = this.builder.globalTable(Serdes.Long(), Serdes.String(), (TimestampExtractor) null, this.globalOne, "globalStore");
        this.stream = this.builder.stream(Serdes.String(), Serdes.Long(), new String[]{this.inputStream});
        this.table = this.builder.table(Serdes.String(), Serdes.Long(), this.inputTable, "table");
        this.foreachAction = new ForeachAction<String, String>() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.3
            public void apply(String str, String str2) {
                GlobalKTableIntegrationTest.this.results.put(str, str2);
            }
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
        this.stream.leftJoin(this.globalTable, this.keyMapper, this.joiner).foreach(this.foreachAction);
        produceInitialGlobalTableValues();
        startStreams();
        produceTopicValues(this.inputStream);
        final HashMap hashMap = new HashMap();
        hashMap.put("a", "1+A");
        hashMap.put("b", "2+B");
        hashMap.put("c", "3+C");
        hashMap.put("d", "4+D");
        hashMap.put("e", "5+null");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.4
            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(hashMap);
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for initial values");
        produceGlobalTableValues();
        final ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.5
            public boolean conditionMet() {
                return "J".equals(readOnlyKeyValueStore.get(5L));
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for data in replicated store");
        produceTopicValues(this.inputStream);
        hashMap.put("a", "1+F");
        hashMap.put("b", "2+G");
        hashMap.put("c", "3+H");
        hashMap.put("d", "4+I");
        hashMap.put("e", "5+J");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.6
            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(hashMap);
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for final values");
    }

    @Test
    public void shouldKStreamGlobalKTableJoin() throws Exception {
        this.stream.join(this.globalTable, this.keyMapper, this.joiner).foreach(this.foreachAction);
        produceInitialGlobalTableValues();
        startStreams();
        produceTopicValues(this.inputStream);
        final HashMap hashMap = new HashMap();
        hashMap.put("a", "1+A");
        hashMap.put("b", "2+B");
        hashMap.put("c", "3+C");
        hashMap.put("d", "4+D");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.7
            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(hashMap);
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for initial values");
        produceGlobalTableValues();
        final ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.8
            public boolean conditionMet() {
                return "J".equals(readOnlyKeyValueStore.get(5L));
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for data in replicated store");
        produceTopicValues(this.inputStream);
        hashMap.put("a", "1+F");
        hashMap.put("b", "2+G");
        hashMap.put("c", "3+H");
        hashMap.put("d", "4+I");
        hashMap.put("e", "5+J");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.9
            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(hashMap);
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for final values");
    }

    @Test
    public void shouldRestoreTransactionalMessages() throws Exception {
        produceInitialGlobalTableValues(true);
        startStreams();
        final HashMap hashMap = new HashMap();
        hashMap.put(1L, "A");
        hashMap.put(2L, "B");
        hashMap.put(3L, "C");
        hashMap.put(4L, "D");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.10
            public boolean conditionMet() {
                try {
                    ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) GlobalKTableIntegrationTest.this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
                    HashMap hashMap2 = new HashMap();
                    KeyValueIterator all = readOnlyKeyValueStore.all();
                    while (all.hasNext()) {
                        KeyValue keyValue = (KeyValue) all.next();
                        hashMap2.put(keyValue.key, keyValue.value);
                    }
                    return hashMap2.equals(hashMap);
                } catch (InvalidStateStoreException e) {
                    return false;
                }
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for initial values");
        System.out.println("no failed test");
    }

    private void createTopics() throws InterruptedException {
        this.inputStream = "input-stream-" + testNo;
        this.inputTable = "input-table-" + testNo;
        this.globalOne = "globalOne-" + testNo;
        CLUSTER.createTopics(this.inputStream, this.inputTable);
        CLUSTER.createTopic(this.globalOne, 2, NUM_BROKERS);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder, this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private void produceTopicValues(String str) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, Arrays.asList(new KeyValue("a", 1L), new KeyValue("b", 2L), new KeyValue("c", 3L), new KeyValue("d", 4L), new KeyValue("e", 5L)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
    }

    private void produceInitialGlobalTableValues() throws ExecutionException, InterruptedException {
        produceInitialGlobalTableValues(false);
    }

    private void produceInitialGlobalTableValues(boolean z) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        if (z) {
            properties.put("transactional.id", "someid");
            properties.put("retries", Integer.valueOf(NUM_BROKERS));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalOne, Arrays.asList(new KeyValue(1L, "A"), new KeyValue(2L, "B"), new KeyValue(3L, "C"), new KeyValue(4L, "D")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, properties), this.mockTime, z);
    }

    private void produceGlobalTableValues() throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalOne, Arrays.asList(new KeyValue(1L, "F"), new KeyValue(2L, "G"), new KeyValue(3L, "H"), new KeyValue(4L, "I"), new KeyValue(5L, "J")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", Integer.valueOf(NUM_BROKERS));
        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
        testNo = 0;
    }
}
