package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.class */
public class EosV2UpgradeIntegrationTest {

    @Parameterized.Parameter
    public boolean injectError;
    private static String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 4;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private static final String APP_DIR_1 = "appDir1";
    private static final String APP_DIR_2 = "appDir2";
    private static final String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or there are too many exceptions thrown, please check standard error log for more info.";
    private static final int MAX_POLL_INTERVAL_MS = (int) Duration.ofSeconds(100).toMillis();
    private static final long MAX_WAIT_TIME_MS = Duration.ofMinutes(1).toMillis();
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE = Collections.unmodifiableList(Arrays.asList(KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.PENDING_SHUTDOWN), KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING)));
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH = Collections.unmodifiableList(Collections.singletonList(KeyValue.pair(KafkaStreams.State.PENDING_ERROR, KafkaStreams.State.ERROR)));
    private static final int NUM_BROKERS = 3;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")));

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final String storeName = "store";
    private final IntegrationTestUtils.StableAssignmentListener assignmentListener = new IntegrationTestUtils.StableAssignmentListener();
    private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean errorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicInteger commitCounterClient1 = new AtomicInteger(-1);
    private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
    private final AtomicInteger commitRequested = new AtomicInteger(0);
    private int testNumber = 0;
    private Map<String, Integer> exceptionCounts = new HashMap<String, Integer>() { // from class: org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.1
        {
            put(EosV2UpgradeIntegrationTest.APP_DIR_1, 0);
            put(EosV2UpgradeIntegrationTest.APP_DIR_2, 0);
        }
    };
    private volatile boolean hasUnexpectedError = false;

    /* loaded from: input_file:org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest$ErrorInjector.class */
    private class ErrorInjector extends KafkaProducer<byte[], byte[]> {
        private final AtomicBoolean crash;

        public ErrorInjector(Map<String, Object> map) {
            super(map, new ByteArraySerializer(), new ByteArraySerializer());
            if (map.get("client.id").toString().contains(EosV2UpgradeIntegrationTest.APP_DIR_1)) {
                this.crash = EosV2UpgradeIntegrationTest.this.commitErrorInjectedClient1;
            } else {
                this.crash = EosV2UpgradeIntegrationTest.this.commitErrorInjectedClient2;
            }
        }

        public void commitTransaction() {
            super.flush();
            if (this.crash.compareAndSet(true, false)) {
                throw new RuntimeException("Injected producer commit test exception.");
            }
            super.commitTransaction();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest$KeyPartitioner.class */
    public static class KeyPartitioner implements Partitioner {
        private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();

        public int partition(String str, Object obj, byte[] bArr, Object obj2, byte[] bArr2, Cluster cluster) {
            return LONG_DESERIALIZER.deserialize(str, bArr).intValue() % EosV2UpgradeIntegrationTest.NUM_TOPIC_PARTITIONS;
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest$TestKafkaClientSupplier.class */
    public class TestKafkaClientSupplier extends DefaultKafkaClientSupplier {
        private TestKafkaClientSupplier() {
        }

        public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
            return new ErrorInjector(map);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "{0}")
    public static Collection<Boolean[]> data() {
        return Arrays.asList(new Boolean[]{false}, new Boolean[]{true});
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        StringBuilder append = new StringBuilder().append("appId-");
        int i = this.testNumber + 1;
        this.testNumber = i;
        applicationId = append.append(i).toString();
        CLUSTER.deleteTopicsAndWait(MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, applicationId + "-store-changelog");
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
    }

    @Test
    public void shouldUpgradeFromEosAlphaToEosV2() throws Exception {
        Set<Long> mkSet;
        List<KeyValue<KafkaStreams.State, KafkaStreams.State>> linkedList = new LinkedList<>();
        KafkaStreams kafkaStreams = null;
        KafkaStreams kafkaStreams2 = null;
        KafkaStreams kafkaStreams3 = null;
        List<KeyValue<KafkaStreams.State, KafkaStreams.State>> linkedList2 = new LinkedList<>();
        KafkaStreams kafkaStreams4 = null;
        KafkaStreams kafkaStreams5 = null;
        KafkaStreams kafkaStreams6 = null;
        try {
            kafkaStreams = getKafkaStreams(APP_DIR_1, "exactly_once");
            kafkaStreams.setStateListener((state, state2) -> {
                linkedList.add(KeyValue.pair(state2, state));
            });
            this.assignmentListener.prepareForRebalance();
            kafkaStreams.cleanUp();
            kafkaStreams.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList);
            kafkaStreams4 = getKafkaStreams(APP_DIR_2, "exactly_once");
            kafkaStreams4.setStateListener((state3, state4) -> {
                linkedList2.add(KeyValue.pair(state4, state3));
            });
            linkedList.clear();
            this.assignmentListener.prepareForRebalance();
            kafkaStreams4.cleanUp();
            kafkaStreams4.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList);
            waitForRunning(linkedList2);
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L, 2L, 3L);
            writeInputData(prepareData);
            TestUtils.waitForCondition(() -> {
                return this.commitRequested.get() == NUM_TOPIC_PARTITIONS;
            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
            HashMap hashMap = new HashMap();
            List<KeyValue<Long, Long>> computeExpectedResult = computeExpectedResult(prepareData, hashMap);
            verifyCommitted(computeExpectedResult);
            Set mkSet2 = Utils.mkSet(new Long[]{0L, 1L, 2L, 3L});
            Set<Long> keysFromInstance = keysFromInstance(kafkaStreams);
            long longValue = keysFromInstance.iterator().next().longValue();
            mkSet2.remove(Long.valueOf(longValue));
            List<KeyValue<Long, Long>> linkedList3 = new LinkedList<>();
            HashMap hashMap2 = new HashMap(hashMap);
            if (this.injectError) {
                List<KeyValue<Long, Long>> linkedList4 = new LinkedList<>();
                Iterator it = mkSet2.iterator();
                while (it.hasNext()) {
                    linkedList4.addAll(prepareData(10L, 15L, Long.valueOf(((Long) it.next()).longValue())));
                }
                linkedList4.addAll(prepareData(10L, 14L, Long.valueOf(longValue)));
                linkedList3.addAll(linkedList4);
                writeInputData(linkedList4);
                computeExpectedResult.addAll(computeExpectedResult(linkedList4, new HashMap<>(hashMap)));
                verifyUncommitted(computeExpectedResult);
            } else {
                linkedList3.addAll(prepareData(10L, 15L, 0L, 1L, 2L, 3L));
                writeInputData(linkedList3);
                computeExpectedResult.addAll(computeExpectedResult(linkedList3, hashMap2));
                verifyUncommitted(computeExpectedResult);
            }
            linkedList2.clear();
            this.assignmentListener.prepareForRebalance();
            if (this.injectError) {
                this.errorInjectedClient1.set(true);
                List<KeyValue<Long, Long>> prepareData2 = prepareData(14L, 15L, Long.valueOf(longValue));
                linkedList3.addAll(prepareData2);
                writeInputData(prepareData2);
            } else {
                linkedList.clear();
                kafkaStreams.close();
                waitForStateTransition(linkedList, CLOSE);
            }
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList2);
            if (this.injectError) {
                computeExpectedResult.addAll(computeExpectedResult((List) linkedList3.stream().filter(keyValue -> {
                    return keysFromInstance.contains(keyValue.key);
                }).collect(Collectors.toList()), new HashMap<>(hashMap)));
                verifyUncommitted(computeExpectedResult);
                waitForStateTransitionContains(linkedList, CRASH);
                this.errorInjectedClient1.set(false);
                linkedList.clear();
                kafkaStreams.close();
                Assert.assertFalse(UNEXPECTED_EXCEPTION_MSG, this.hasUnexpectedError);
            } else {
                verifyCommitted(computeExpectedResult((List) linkedList3.stream().filter(keyValue2 -> {
                    return keysFromInstance.contains(keyValue2.key);
                }).collect(Collectors.toList()), hashMap));
            }
            this.commitRequested.set(0);
            linkedList.clear();
            linkedList2.clear();
            kafkaStreams2 = getKafkaStreams(APP_DIR_1, "exactly_once_v2");
            kafkaStreams2.setStateListener((state5, state6) -> {
                linkedList.add(KeyValue.pair(state6, state5));
            });
            this.assignmentListener.prepareForRebalance();
            kafkaStreams2.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList);
            waitForRunning(linkedList2);
            if (this.injectError) {
                mkSet = Utils.mkSet(new Long[]{0L, 1L, 2L, 3L});
            } else {
                mkSet = keysFromInstance(kafkaStreams2);
                mkSet.removeAll(keysFromInstance);
            }
            Set<Long> set = mkSet;
            verifyCommitted(computeExpectedResult((List) linkedList3.stream().filter(keyValue3 -> {
                return set.contains(keyValue3.key);
            }).collect(Collectors.toList()), hashMap));
            this.commitCounterClient1.set(0);
            if (this.injectError) {
                Set<Long> keysFromInstance2 = keysFromInstance(kafkaStreams2);
                Set<Long> keysFromInstance3 = keysFromInstance(kafkaStreams4);
                List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, (Long[]) keysFromInstance2.toArray(new Long[0]));
                writeInputData(prepareData3);
                List<KeyValue<Long, Long>> computeExpectedResult2 = computeExpectedResult(prepareData3, hashMap);
                verifyCommitted(computeExpectedResult2);
                computeExpectedResult.addAll(computeExpectedResult2);
                this.commitCounterClient2.set(0);
                Iterator<Long> it2 = keysFromInstance3.iterator();
                Long next = it2.next();
                Long next2 = it2.next();
                List<KeyValue<Long, Long>> prepareData4 = prepareData(15L, 19L, (Long[]) keysFromInstance3.toArray(new Long[0]));
                prepareData4.addAll(prepareData(19L, 20L, next));
                writeInputData(prepareData4);
                hashMap2.putAll(hashMap);
                computeExpectedResult.addAll(computeExpectedResult(prepareData4, hashMap2));
                verifyUncommitted(computeExpectedResult);
                linkedList.clear();
                linkedList2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient1.set(0);
                this.commitErrorInjectedClient2.set(true);
                List<KeyValue<Long, Long>> prepareData5 = prepareData(19L, 20L, next2);
                prepareData4.addAll(prepareData5);
                writeInputData(prepareData5);
                computeExpectedResult.addAll(computeExpectedResult(prepareData5, hashMap2));
                verifyUncommitted(computeExpectedResult);
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                waitForStateTransitionContains(linkedList2, CRASH);
                this.commitErrorInjectedClient2.set(false);
                linkedList2.clear();
                kafkaStreams4.close();
                Assert.assertFalse(UNEXPECTED_EXCEPTION_MSG, this.hasUnexpectedError);
                List<KeyValue<Long, Long>> computeExpectedResult3 = computeExpectedResult(prepareData4, hashMap);
                verifyCommitted(computeExpectedResult3);
                computeExpectedResult.addAll(computeExpectedResult3);
            } else {
                List<KeyValue<Long, Long>> prepareData6 = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
                writeInputData(prepareData6);
                Set<Long> set2 = mkSet;
                List<KeyValue<Long, Long>> list = (List) linkedList3.stream().filter(keyValue4 -> {
                    return !keysFromInstance.contains(keyValue4.key);
                }).filter(keyValue5 -> {
                    return !set2.contains(keyValue5.key);
                }).collect(Collectors.toList());
                list.addAll(prepareData6);
                computeExpectedResult.addAll(computeExpectedResult(prepareData6, hashMap2));
                verifyCommitted(computeExpectedResult(list, hashMap));
            }
            if (this.injectError) {
                this.commitCounterClient1.set(0);
                this.commitCounterClient2.set(-1);
                linkedList.clear();
                linkedList2.clear();
                kafkaStreams5 = getKafkaStreams(APP_DIR_2, "exactly_once");
                kafkaStreams5.setStateListener((state7, state8) -> {
                    linkedList2.add(KeyValue.pair(state8, state7));
                });
                this.assignmentListener.prepareForRebalance();
                kafkaStreams5.start();
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                waitForRunning(linkedList);
                waitForRunning(linkedList2);
                Set<Long> keysFromInstance4 = keysFromInstance(kafkaStreams2);
                List<KeyValue<Long, Long>> prepareData7 = prepareData(20L, 30L, (Long[]) keysFromInstance(kafkaStreams5).toArray(new Long[0]));
                writeInputData(prepareData7);
                List<KeyValue<Long, Long>> computeExpectedResult4 = computeExpectedResult(prepareData7, hashMap);
                verifyCommitted(computeExpectedResult4);
                computeExpectedResult.addAll(computeExpectedResult4);
                this.commitCounterClient2.set(0);
                Iterator<Long> it3 = keysFromInstance4.iterator();
                Long next3 = it3.next();
                Long next4 = it3.next();
                List<KeyValue<Long, Long>> prepareData8 = prepareData(20L, 29L, (Long[]) keysFromInstance4.toArray(new Long[0]));
                prepareData8.addAll(prepareData(29L, 30L, next3));
                writeInputData(prepareData8);
                hashMap2.putAll(hashMap);
                computeExpectedResult.addAll(computeExpectedResult(prepareData8, hashMap2));
                verifyUncommitted(computeExpectedResult);
                linkedList.clear();
                linkedList2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient2.set(0);
                this.commitErrorInjectedClient1.set(true);
                List<KeyValue<Long, Long>> prepareData9 = prepareData(29L, 30L, next4);
                prepareData8.addAll(prepareData9);
                writeInputData(prepareData9);
                computeExpectedResult.addAll(computeExpectedResult(prepareData9, hashMap2));
                verifyUncommitted(computeExpectedResult);
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                waitForStateTransitionContains(linkedList, CRASH);
                this.commitErrorInjectedClient1.set(false);
                linkedList.clear();
                kafkaStreams2.close();
                Assert.assertFalse(UNEXPECTED_EXCEPTION_MSG, this.hasUnexpectedError);
                List<KeyValue<Long, Long>> computeExpectedResult5 = computeExpectedResult(prepareData8, hashMap);
                verifyCommitted(computeExpectedResult5);
                computeExpectedResult.addAll(computeExpectedResult5);
                linkedList.clear();
                linkedList2.clear();
                kafkaStreams3 = getKafkaStreams(APP_DIR_1, "exactly_once_v2");
                kafkaStreams3.setStateListener((state9, state10) -> {
                    linkedList.add(KeyValue.pair(state10, state9));
                });
                this.assignmentListener.prepareForRebalance();
                kafkaStreams3.start();
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                waitForRunning(linkedList);
                waitForRunning(linkedList2);
            } else {
                kafkaStreams5 = kafkaStreams4;
            }
            mkSet2.addAll(Utils.mkSet(new Long[]{0L, 1L, 2L, 3L}));
            Set<Long> keysFromInstance5 = keysFromInstance(kafkaStreams5);
            long longValue2 = keysFromInstance5.iterator().next().longValue();
            mkSet2.remove(Long.valueOf(longValue2));
            List<KeyValue<Long, Long>> linkedList5 = new LinkedList<>();
            if (this.injectError) {
                List<KeyValue<Long, Long>> linkedList6 = new LinkedList<>();
                Iterator it4 = mkSet2.iterator();
                while (it4.hasNext()) {
                    linkedList6.addAll(prepareData(30L, 35L, Long.valueOf(((Long) it4.next()).longValue())));
                }
                linkedList6.addAll(prepareData(30L, 34L, Long.valueOf(longValue2)));
                linkedList5.addAll(linkedList6);
                writeInputData(linkedList6);
                computeExpectedResult.addAll(computeExpectedResult(linkedList6, new HashMap<>(hashMap)));
                verifyUncommitted(computeExpectedResult);
            } else {
                linkedList5.addAll(prepareData(30L, 35L, 0L, 1L, 2L, 3L));
                writeInputData(linkedList5);
                computeExpectedResult.addAll(computeExpectedResult(linkedList5, new HashMap<>(hashMap)));
                verifyUncommitted(computeExpectedResult);
            }
            linkedList.clear();
            this.assignmentListener.prepareForRebalance();
            if (this.injectError) {
                this.errorInjectedClient2.set(true);
                List<KeyValue<Long, Long>> prepareData10 = prepareData(34L, 35L, Long.valueOf(longValue2));
                linkedList5.addAll(prepareData10);
                writeInputData(prepareData10);
            } else {
                linkedList2.clear();
                kafkaStreams5.close();
                waitForStateTransition(linkedList2, CLOSE);
            }
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList);
            if (this.injectError) {
                computeExpectedResult.addAll(computeExpectedResult((List) linkedList5.stream().filter(keyValue6 -> {
                    return keysFromInstance5.contains(keyValue6.key);
                }).collect(Collectors.toList()), new HashMap<>(hashMap)));
                verifyUncommitted(computeExpectedResult);
                waitForStateTransitionContains(linkedList2, CRASH);
                this.errorInjectedClient2.set(false);
                linkedList2.clear();
                kafkaStreams5.close();
                Assert.assertFalse(UNEXPECTED_EXCEPTION_MSG, this.hasUnexpectedError);
            } else {
                verifyCommitted(computeExpectedResult((List) linkedList5.stream().filter(keyValue7 -> {
                    return keysFromInstance5.contains(keyValue7.key);
                }).collect(Collectors.toList()), hashMap));
            }
            this.commitRequested.set(0);
            linkedList.clear();
            linkedList2.clear();
            kafkaStreams6 = getKafkaStreams(APP_DIR_1, "exactly_once_v2");
            kafkaStreams6.setStateListener((state11, state12) -> {
                linkedList2.add(KeyValue.pair(state12, state11));
            });
            this.assignmentListener.prepareForRebalance();
            kafkaStreams6.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            waitForRunning(linkedList);
            waitForRunning(linkedList2);
            mkSet.clear();
            if (this.injectError) {
                mkSet.addAll(Utils.mkSet(new Long[]{0L, 1L, 2L, 3L}));
            } else {
                mkSet.addAll(keysFromInstance(kafkaStreams6));
                mkSet.removeAll(keysFromInstance5);
            }
            Set<Long> set3 = mkSet;
            verifyCommitted(computeExpectedResult((List) linkedList5.stream().filter(keyValue8 -> {
                return set3.contains(keyValue8.key);
            }).collect(Collectors.toList()), hashMap));
            this.commitCounterClient1.set(-1);
            this.commitCounterClient2.set(-1);
            List<KeyValue<Long, Long>> prepareData11 = prepareData(35L, 40L, 0L, 1L, 2L, 3L);
            writeInputData(prepareData11);
            Set mkSet3 = Utils.mkSet(new Long[]{0L, 1L, 2L, 3L});
            mkSet3.removeAll(keysFromInstance5);
            mkSet3.removeAll(mkSet);
            List<KeyValue<Long, Long>> list2 = (List) linkedList5.stream().filter(keyValue9 -> {
                return mkSet3.contains(keyValue9.key);
            }).collect(Collectors.toList());
            list2.addAll(prepareData11);
            verifyCommitted(computeExpectedResult(list2, hashMap));
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
            if (kafkaStreams2 != null) {
                kafkaStreams2.close();
            }
            if (kafkaStreams3 != null) {
                kafkaStreams3.close();
            }
            if (kafkaStreams4 != null) {
                kafkaStreams4.close();
            }
            if (kafkaStreams5 != null) {
                kafkaStreams5.close();
            }
            if (kafkaStreams6 != null) {
                kafkaStreams6.close();
            }
        } catch (Throwable th) {
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
            if (kafkaStreams2 != null) {
                kafkaStreams2.close();
            }
            if (kafkaStreams3 != null) {
                kafkaStreams3.close();
            }
            if (kafkaStreams4 != null) {
                kafkaStreams4.close();
            }
            if (kafkaStreams5 != null) {
                kafkaStreams5.close();
            }
            if (kafkaStreams6 != null) {
                kafkaStreams6.close();
            }
            throw th;
        }
    }

    private KafkaStreams getKafkaStreams(String str, String str2) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Long(), Serdes.Long()).withCachingEnabled());
        streamsBuilder.stream(MULTI_PARTITION_INPUT_TOPIC).transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Transformer<Long, Long, KeyValue<Long, Long>> m10get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.2.1
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;
                    AtomicBoolean crash;
                    AtomicInteger sharedCommit;

                    public void init(ProcessorContext processorContext) {
                        this.context = processorContext;
                        this.state = processorContext.getStateStore("store");
                        if (EosV2UpgradeIntegrationTest.APP_DIR_1.equals(processorContext.appConfigs().get("client.id").toString())) {
                            this.crash = EosV2UpgradeIntegrationTest.this.errorInjectedClient1;
                            this.sharedCommit = EosV2UpgradeIntegrationTest.this.commitCounterClient1;
                        } else {
                            this.crash = EosV2UpgradeIntegrationTest.this.errorInjectedClient2;
                            this.sharedCommit = EosV2UpgradeIntegrationTest.this.commitCounterClient2;
                        }
                    }

                    public KeyValue<Long, Long> transform(Long l, Long l2) {
                        if ((l2.longValue() + 1) % 10 == 0) {
                            if (this.sharedCommit.get() < 0 || this.sharedCommit.incrementAndGet() == 2) {
                                this.context.commit();
                            }
                            EosV2UpgradeIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        Long l3 = (Long) this.state.get(l);
                        this.state.put(l, l3 == null ? l2 : Long.valueOf(l3.longValue() + l2.longValue()));
                        this.state.flush();
                        if (l2.longValue() % 10 == 4 && this.crash != null && this.crash.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        return new KeyValue<>(l, this.state.get(l));
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"store"}).to(MULTI_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("client.id", str);
        properties.put("processing.guarantee", str2);
        long millis = Duration.ofMinutes(1L).toMillis();
        properties.put("commit.interval.ms", Long.valueOf(millis));
        properties.put(StreamsConfig.consumerPrefix("metadata.max.age.ms"), Long.valueOf(Duration.ofSeconds(1L).toMillis()));
        properties.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix("request.timeout.ms"), Integer.valueOf((int) Duration.ofSeconds(5L).toMillis()));
        properties.put(StreamsConfig.consumerPrefix("session.timeout.ms"), Integer.valueOf((int) Duration.ofSeconds(5L).minusMillis(1L).toMillis()));
        properties.put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), Integer.valueOf(MAX_POLL_INTERVAL_MS));
        properties.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), Integer.valueOf((int) millis));
        properties.put(StreamsConfig.producerPrefix("partitioner.class"), KeyPartitioner.class);
        properties.put("cache.max.bytes.buffering", 0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + str);
        properties.put("__assignment.listener__", this.assignmentListener);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties), new TestKafkaClientSupplier());
        kafkaStreams.setUncaughtExceptionHandler(th -> {
            if (this.injectError) {
                int intValue = this.exceptionCounts.get(str).intValue() + 1;
                if (intValue > 2 || !(th instanceof RuntimeException) || !th.getMessage().contains("test exception")) {
                    th.printStackTrace(System.err);
                    this.hasUnexpectedError = true;
                }
                this.exceptionCounts.put(str, Integer.valueOf(intValue));
            } else {
                th.printStackTrace(System.err);
                this.hasUnexpectedError = true;
            }
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        return kafkaStreams;
    }

    private void waitForRunning(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> list) throws Exception {
        TestUtils.waitForCondition(() -> {
            return !list.isEmpty() && ((KafkaStreams.State) ((KeyValue) list.get(list.size() - 1)).value).equals(KafkaStreams.State.RUNNING);
        }, MAX_WAIT_TIME_MS, () -> {
            return "Client did not startup on time. Observers transitions: " + list;
        });
    }

    private void waitForStateTransition(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> list, List<KeyValue<KafkaStreams.State, KafkaStreams.State>> list2) throws Exception {
        TestUtils.waitForCondition(() -> {
            return list.equals(list2);
        }, MAX_WAIT_TIME_MS, () -> {
            return "Client did not have the expected state transition on time. Observers transitions: " + list + "Expected transitions: " + list2;
        });
    }

    private void waitForStateTransitionContains(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> list, List<KeyValue<KafkaStreams.State, KafkaStreams.State>> list2) throws Exception {
        TestUtils.waitForCondition(() -> {
            return list.containsAll(list2);
        }, MAX_WAIT_TIME_MS, () -> {
            return "Client did not have the expected state transition on time. Observers transitions: " + list + "Expected transitions: " + list2;
        });
    }

    private List<KeyValue<Long, Long>> prepareData(long j, long j2, Long... lArr) {
        ArrayList arrayList = new ArrayList();
        for (Long l : lArr) {
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 < j2) {
                    arrayList.add(new KeyValue(l, Long.valueOf(j4)));
                    j3 = j4 + 1;
                }
            }
        }
        return arrayList;
    }

    private void writeInputData(List<KeyValue<Long, Long>> list) {
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class);
        producerConfig.setProperty("partitioner.class", KeyPartitioner.class.getName());
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, list, producerConfig, CLUSTER.time);
    }

    private void verifyCommitted(List<KeyValue<Long, Long>> list) throws Exception {
        checkResultPerKey(readResult(list.size(), true), list);
    }

    private void verifyUncommitted(List<KeyValue<Long, Long>> list) throws Exception {
        checkResultPerKey(readResult(list.size(), false), list);
    }

    private List<KeyValue<Long, Long>> readResult(int i, boolean z) throws Exception {
        return z ? IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), MULTI_PARTITION_OUTPUT_TOPIC, i, MAX_WAIT_TIME_MS) : IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), MULTI_PARTITION_OUTPUT_TOPIC, i);
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> list, List<KeyValue<Long, Long>> list2) {
        HashSet hashSet = new HashSet();
        addAllKeys(hashSet, list);
        addAllKeys(hashSet, list2);
        for (Long l : hashSet) {
            try {
                MatcherAssert.assertThat(getAllRecordPerKey(l, list), CoreMatchers.equalTo(getAllRecordPerKey(l, list2)));
            } catch (AssertionError e) {
                throw new AssertionError("expected result: " + ((String) list2.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(", "))) + "\nreceived records: " + ((String) list.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(", "))), e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addAllKeys(Set<Long> set, List<KeyValue<Long, Long>> list) {
        Iterator<KeyValue<Long, Long>> it = list.iterator();
        while (it.hasNext()) {
            set.add(it.next().key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long l, List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KeyValue<Long, Long> keyValue : list) {
            if (((Long) keyValue.key).equals(l)) {
                arrayList.add(keyValue);
            }
        }
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> list, Map<Long, Long> map) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KeyValue<Long, Long> keyValue : list) {
            long longValue = ((Long) map.getOrDefault(keyValue.key, 0L)).longValue();
            map.put(keyValue.key, Long.valueOf(longValue + ((Long) keyValue.value).longValue()));
            arrayList.add(new KeyValue(keyValue.key, Long.valueOf(longValue + ((Long) keyValue.value).longValue())));
        }
        return arrayList;
    }

    private Set<Long> keysFromInstance(KafkaStreams kafkaStreams) throws Exception {
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) kafkaStreams.store(StoreQueryParameters.fromNameAndType("store", QueryableStoreTypes.keyValueStore()));
            hashSet.clear();
            KeyValueIterator all = readOnlyKeyValueStore.all();
            Throwable th = null;
            while (all.hasNext()) {
                try {
                    try {
                        hashSet.add(((KeyValue) all.next()).key);
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (all != null) {
                        if (th != null) {
                            try {
                                all.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            all.close();
                        }
                    }
                    throw th3;
                }
            }
            if (all == null) {
                return true;
            }
            if (0 == 0) {
                all.close();
                return true;
            }
            try {
                all.close();
                return true;
            } catch (Throwable th5) {
                th.addSuppressed(th5);
                return true;
            }
        }, MAX_WAIT_TIME_MS, "Could not get keys from store: store");
        return hashSet;
    }
}
