package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.class */
public abstract class AbstractJoinIntegrationTest {
    static String appID;
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    static final long ANY_UNIQUE_KEY = 0;
    StreamsBuilder builder;
    final boolean cacheEnabled;
    private static final long TIMEOUT = 30000;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Long COMMIT_INTERVAL = 100L;
    static final Properties STREAMS_CONFIG = new Properties();

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    private final MockTime time = new MockTime();
    int numRecordsExpected = 0;
    AtomicBoolean finalResultReached = new AtomicBoolean(false);
    private final List<Input<String>> input = Arrays.asList(new Input(INPUT_TOPIC_LEFT, null), new Input(INPUT_TOPIC_RIGHT, null), new Input(INPUT_TOPIC_LEFT, "A"), new Input(INPUT_TOPIC_RIGHT, "a"), new Input(INPUT_TOPIC_LEFT, "B"), new Input(INPUT_TOPIC_RIGHT, "b"), new Input(INPUT_TOPIC_LEFT, null), new Input(INPUT_TOPIC_RIGHT, null), new Input(INPUT_TOPIC_LEFT, "C"), new Input(INPUT_TOPIC_RIGHT, "c"), new Input(INPUT_TOPIC_RIGHT, null), new Input(INPUT_TOPIC_LEFT, null), new Input(INPUT_TOPIC_RIGHT, null), new Input(INPUT_TOPIC_RIGHT, "d"), new Input(INPUT_TOPIC_LEFT, "D"));
    final ValueJoiner<String, String, String> valueJoiner = (str, str2) -> {
        return str + "-" + str2;
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/AbstractJoinIntegrationTest$Input.class */
    public static final class Input<V> {
        String topic;
        KeyValue<Long, V> record;

        Input(String str, V v) {
            this.topic = str;
            this.record = KeyValue.pair(Long.valueOf(AbstractJoinIntegrationTest.ANY_UNIQUE_KEY), v);
        }
    }

    @Parameterized.Parameters(name = "caching enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            arrayList.add(new Object[]{Boolean.valueOf(((Boolean) it.next()).booleanValue())});
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractJoinIntegrationTest(boolean z) {
        this.cacheEnabled = z;
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void prepareEnvironment() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        if (!this.cacheEnabled) {
            STREAMS_CONFIG.put("cache.max.bytes.buffering", 0);
        }
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTestWithDriver(List<List<TestRecord<Long, String>>> list) {
        runTestWithDriver(list, (String) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTestWithDriver(List<List<TestRecord<Long, String>>> list, String str) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(STREAMS_CONFIG), STREAMS_CONFIG);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new StringDeserializer());
            HashMap hashMap = new HashMap();
            hashMap.put(INPUT_TOPIC_RIGHT, createInputTopic);
            hashMap.put(INPUT_TOPIC_LEFT, createInputTopic2);
            TestRecord<Long, String> testRecord = null;
            long milliseconds = this.time.milliseconds();
            long j = milliseconds;
            Iterator<List<TestRecord<Long, String>>> it = list.iterator();
            for (Input<String> input : this.input) {
                long j2 = j + 1;
                j = j2;
                ((TestInputTopic) hashMap.get(input.topic)).pipeInput(input.record.key, input.record.value, j2);
                List<TestRecord<Long, String>> next = it.next();
                if (next != null) {
                    LinkedList linkedList = new LinkedList();
                    for (TestRecord<Long, String> testRecord2 : next) {
                        linkedList.add(new TestRecord(testRecord2.key(), testRecord2.value(), (Headers) null, Long.valueOf(milliseconds + testRecord2.timestamp().longValue())));
                    }
                    MatcherAssert.assertThat(createOutputTopic.readRecordsToList(), IsEqual.equalTo(linkedList));
                    testRecord = (TestRecord) linkedList.get(next.size() - 1);
                }
            }
            if (str != null) {
                checkQueryableStore(str, testRecord, topologyTestDriver);
            }
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTestWithDriver(TestRecord<Long, String> testRecord, String str) throws InterruptedException {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(STREAMS_CONFIG), STREAMS_CONFIG);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new StringDeserializer());
                HashMap hashMap = new HashMap();
                hashMap.put(INPUT_TOPIC_RIGHT, createInputTopic);
                hashMap.put(INPUT_TOPIC_LEFT, createInputTopic2);
                long milliseconds = this.time.milliseconds();
                long j = milliseconds;
                for (Input<String> input : this.input) {
                    long j2 = j + 1;
                    j = j2;
                    ((TestInputTopic) hashMap.get(input.topic)).pipeInput(input.record.key, input.record.value, j2);
                }
                TestRecord<Long, String> testRecord2 = new TestRecord<>(testRecord.key(), testRecord.value(), (Headers) null, Long.valueOf(milliseconds + testRecord.timestamp().longValue()));
                List readRecordsToList = createOutputTopic.readRecordsToList();
                MatcherAssert.assertThat(readRecordsToList.get(readRecordsToList.size() - 1), IsEqual.equalTo(testRecord2));
                if (str != null) {
                    checkQueryableStore(str, testRecord2, topologyTestDriver);
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private void checkQueryableStore(String str, TestRecord<Long, String> testRecord, TopologyTestDriver topologyTestDriver) {
        KeyValueIterator all = topologyTestDriver.getTimestampedKeyValueStore(str).all();
        KeyValue keyValue = (KeyValue) all.next();
        try {
            MatcherAssert.assertThat(keyValue.key, Is.is(testRecord.key()));
            MatcherAssert.assertThat(((ValueAndTimestamp) keyValue.value).value(), Is.is(testRecord.value()));
            MatcherAssert.assertThat(Long.valueOf(((ValueAndTimestamp) keyValue.value).timestamp()), Is.is(testRecord.timestamp()));
            MatcherAssert.assertThat(Boolean.valueOf(all.hasNext()), Is.is(false));
            all.close();
        } catch (Throwable th) {
            all.close();
            throw th;
        }
    }
}
