package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.class */
public abstract class AbstractJoinIntegrationTest {

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    static String appID;
    private static final Long COMMIT_INTERVAL;
    static final Properties STREAMS_CONFIG;
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    private static final Properties PRODUCER_CONFIG;
    private static final Properties RESULT_CONSUMER_CONFIG;
    private KafkaProducer<Long, String> producer;
    private KafkaStreams streams;
    StreamsBuilder builder;
    final boolean cacheEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    private final long anyUniqueKey = 0;
    int numRecordsExpected = 0;
    AtomicBoolean finalResultReached = new AtomicBoolean(false);
    private final List<Input<String>> input = Arrays.asList(new Input(INPUT_TOPIC_LEFT, (String) null), new Input(INPUT_TOPIC_RIGHT, (String) null), new Input(INPUT_TOPIC_LEFT, "A"), new Input(INPUT_TOPIC_RIGHT, "a"), new Input(INPUT_TOPIC_LEFT, "B"), new Input(INPUT_TOPIC_RIGHT, "b"), new Input(INPUT_TOPIC_LEFT, (String) null), new Input(INPUT_TOPIC_RIGHT, (String) null), new Input(INPUT_TOPIC_LEFT, "C"), new Input(INPUT_TOPIC_RIGHT, "c"), new Input(INPUT_TOPIC_RIGHT, (String) null), new Input(INPUT_TOPIC_LEFT, (String) null), new Input(INPUT_TOPIC_RIGHT, (String) null), new Input(INPUT_TOPIC_RIGHT, "d"), new Input(INPUT_TOPIC_LEFT, "D"));
    final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() { // from class: org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.1
        public String apply(String str, String str2) {
            return str + "-" + str2;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/AbstractJoinIntegrationTest$Input.class */
    public final class Input<V> {
        String topic;
        KeyValue<Long, V> record;

        Input(String str, V v) {
            this.topic = str;
            this.record = KeyValue.pair(0L, v);
        }
    }

    @Parameterized.Parameters(name = "caching enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            arrayList.add(new Object[]{Boolean.valueOf(((Boolean) it.next()).booleanValue())});
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractJoinIntegrationTest(boolean z) {
        this.cacheEnabled = z;
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", 0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        RESULT_CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        RESULT_CONSUMER_CONFIG.put("group.id", appID + "-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void prepareEnvironment() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        if (!this.cacheEnabled) {
            STREAMS_CONFIG.put("cache.max.bytes.buffering", 0);
        }
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        this.producer = new KafkaProducer<>(PRODUCER_CONFIG);
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    private void checkResult(String str, List<String> list) throws InterruptedException {
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, str, list.size(), IntegrationTestUtils.DEFAULT_TIMEOUT), Is.is(list));
    }

    private void checkResult(String str, String str2, int i) throws InterruptedException {
        List waitUntilMinValuesRecordsReceived = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, str, i, IntegrationTestUtils.DEFAULT_TIMEOUT);
        MatcherAssert.assertThat(waitUntilMinValuesRecordsReceived.get(waitUntilMinValuesRecordsReceived.size() - 1), Is.is(str2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTest(List<List<String>> list) throws Exception {
        runTest(list, (String) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v25, types: [org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.KafkaProducer<java.lang.Long, java.lang.String>] */
    public void runTest(List<List<String>> list, String str) throws Exception {
        if (!$assertionsDisabled && list.size() != this.input.size()) {
            throw new AssertionError();
        }
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), STREAMS_CONFIG);
        String str2 = null;
        try {
            this.streams.start();
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<List<String>> it = list.iterator();
            for (Input<String> input : this.input) {
                ?? r0 = this.producer;
                long j = currentTimeMillis + 1;
                currentTimeMillis = r0;
                r0.send(new ProducerRecord(input.topic, (Integer) null, Long.valueOf(j), input.record.key, input.record.value)).get();
                List<String> next = it.next();
                if (next != null) {
                    checkResult(OUTPUT_TOPIC, next);
                    str2 = next.get(next.size() - 1);
                }
            }
            if (str != null) {
                checkQueryableStore(str, str2);
            }
        } finally {
            this.streams.close();
        }
    }

    void runTest(String str) throws Exception {
        runTest(str, (String) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v23, types: [org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.KafkaProducer<java.lang.Long, java.lang.String>] */
    public void runTest(String str, String str2) throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), STREAMS_CONFIG);
        try {
            this.streams.start();
            long currentTimeMillis = System.currentTimeMillis();
            for (Input<String> input : this.input) {
                ?? r0 = this.producer;
                long j = currentTimeMillis + 1;
                currentTimeMillis = r0;
                r0.send(new ProducerRecord(input.topic, (Integer) null, Long.valueOf(j), input.record.key, input.record.value)).get();
            }
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.2
                public boolean conditionMet() {
                    return AbstractJoinIntegrationTest.this.finalResultReached.get();
                }
            }, "Never received expected final result.");
            checkResult(OUTPUT_TOPIC, str, this.numRecordsExpected);
            if (str2 != null) {
                checkQueryableStore(str2, str);
            }
        } finally {
            this.streams.close();
        }
    }

    private void checkQueryableStore(String str, String str2) {
        KeyValueIterator all = ((ReadOnlyKeyValueStore) this.streams.store(str, QueryableStoreTypes.keyValueStore())).all();
        KeyValue keyValue = (KeyValue) all.next();
        try {
            MatcherAssert.assertThat(keyValue.key, Is.is(0L));
            MatcherAssert.assertThat(keyValue.value, Is.is(str2));
            MatcherAssert.assertThat(Boolean.valueOf(all.hasNext()), Is.is(false));
            all.close();
        } catch (Throwable th) {
            all.close();
            throw th;
        }
    }

    static {
        $assertionsDisabled = !AbstractJoinIntegrationTest.class.desiredAssertionStatus();
        CLUSTER = new EmbeddedKafkaCluster(1);
        COMMIT_INTERVAL = 100L;
        STREAMS_CONFIG = new Properties();
        PRODUCER_CONFIG = new Properties();
        RESULT_CONSUMER_CONFIG = new Properties();
    }
}
