package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(600)
@Tags({@Tag("integration"), @Tag("bazel:shard_count:3")})
/* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest.class */
public class RestoreIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private String appId;
    private String inputStream;
    private KafkaStreams kafkaStreams;
    private static final Logger log = LoggerFactory.getLogger(RestoreIntegrationTest.class);
    private static final Duration RESTORATION_DELAY = Duration.ofSeconds(1);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final int numberOfKeys = 10000;
    private final List<Properties> streamsConfigurations = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest$CloseCountingInMemoryStore.class */
    public static class CloseCountingInMemoryStore extends InMemoryKeyValueStore {
        static AtomicInteger numStoresClosed = new AtomicInteger(0);

        CloseCountingInMemoryStore(String str) {
            super(str);
        }

        public void close() {
            numStoresClosed.incrementAndGet();
            super.close();
        }

        static int numStoresClosed() {
            return numStoresClosed.get();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest$KeyValueStoreProcessor.class */
    public static class KeyValueStoreProcessor implements Processor<Integer, Integer, Void, Void> {
        private final String topic;
        private final CountDownLatch processorLatch;
        private KeyValueStore<Integer, Integer> store;

        KeyValueStoreProcessor(String str, CountDownLatch countDownLatch) {
            this.topic = str;
            this.processorLatch = countDownLatch;
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(this.topic);
        }

        public void process(Record<Integer, Integer> record) {
            if (record.key() != null) {
                this.store.put(record.key(), record.value());
                this.processorLatch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest$TestStateRestoreListener.class */
    private static final class TestStateRestoreListener implements StateRestoreListener {
        private final String instanceName;
        private final Duration onBatchRestoredSleepDuration;
        private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
        private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
        private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch(1);
        private final CountDownLatch onBatchRestoredLatch = new CountDownLatch(1);

        TestStateRestoreListener(String str, Duration duration) {
            this.onBatchRestoredSleepDuration = duration;
            this.instanceName = str;
        }

        boolean awaitUntilRestorationStarts() throws InterruptedException {
            return awaitLatchWithTimeout(this.onRestoreStartLatch);
        }

        boolean awaitUntilRestorationSuspends() throws InterruptedException {
            return awaitLatchWithTimeout(this.onRestoreSuspendedLatch);
        }

        boolean awaitUntilRestorationEnds() throws InterruptedException {
            return awaitLatchWithTimeout(this.onRestoreEndLatch);
        }

        public boolean awaitUntilBatchRestoredIsCalled() throws InterruptedException {
            return awaitLatchWithTimeout(this.onBatchRestoredLatch);
        }

        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            RestoreIntegrationTest.log.info("[{}] called onRestoreStart. topicPartition={}, storeName={}, startingOffset={}, endingOffset={}", new Object[]{this.instanceName, topicPartition, str, Long.valueOf(j), Long.valueOf(j2)});
            this.onRestoreStartLatch.countDown();
        }

        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            RestoreIntegrationTest.log.info("[{}] called onBatchRestored. topicPartition={}, storeName={}, batchEndOffset={}, numRestored={}", new Object[]{this.instanceName, topicPartition, str, Long.valueOf(j), Long.valueOf(j2)});
            Utils.sleep(this.onBatchRestoredSleepDuration.toMillis());
            this.onBatchRestoredLatch.countDown();
        }

        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            RestoreIntegrationTest.log.info("[{}] called onRestoreEnd. topicPartition={}, storeName={}, totalRestored={}", new Object[]{this.instanceName, topicPartition, str, Long.valueOf(j)});
            this.onRestoreEndLatch.countDown();
        }

        public void onRestoreSuspended(TopicPartition topicPartition, String str, long j) {
            RestoreIntegrationTest.log.info("[{}] called onRestoreSuspended. topicPartition={}, storeName={}, totalRestored={}", new Object[]{this.instanceName, topicPartition, str, Long.valueOf(j)});
            this.onRestoreSuspendedLatch.countDown();
        }

        private static boolean awaitLatchWithTimeout(CountDownLatch countDownLatch) throws InterruptedException {
            return countDownLatch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        }
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics(TestInfo testInfo) throws InterruptedException {
        this.appId = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.inputStream = this.appId + "-input-stream";
        CLUSTER.createTopic(this.inputStream, 2, 1);
    }

    private Properties props(boolean z) {
        return props(Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("__state.updater.enabled__", Boolean.valueOf(z))})));
    }

    private Properties props(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("application.id", this.appId);
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("statestore.cache.max.bytes", 0);
        properties2.put("state.dir", TestUtils.tempDirectory(this.appId).getPath());
        properties2.put("default.key.serde", Serdes.Integer().getClass());
        properties2.put("default.value.serde", Serdes.Integer().getClass());
        properties2.put("commit.interval.ms", 1000L);
        properties2.put("auto.offset.reset", "earliest");
        properties2.putAll(properties);
        this.streamsConfigurations.add(properties2);
        return properties2;
    }

    @AfterEach
    public void shutdown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfigurations);
        this.streamsConfigurations.clear();
    }

    private static Stream<Boolean> parameters() {
        return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE});
    }

    @Test
    public void shouldRestoreNullRecord() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig("restoration-test-app", CLUSTER.bootstrapServers(), Serdes.Integer().getClass().getName(), Serdes.ByteArray().getClass().getName(), new Properties());
        CLUSTER.createTopics("input");
        CLUSTER.createTopics("output");
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
        streamsBuilder.table("input", Materialized.as(Stores.persistentTimestampedKeyValueStore("stateStore")).withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Bytes()).withCachingDisabled()).toStream().to("output");
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
        List asList = Arrays.asList(KeyValue.pair(3, new Bytes(new byte[]{3})), KeyValue.pair(3, (Object) null), KeyValue.pair(1, new Bytes(new byte[]{1})));
        IntegrationTestUtils.produceKeyValuesSynchronously("input", asList, producerConfig, new MockTime());
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
        kafkaStreams.start();
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, "output", asList);
        kafkaStreams.close();
        kafkaStreams.cleanUp();
        List singletonList = Collections.singletonList(KeyValue.pair(2, new Bytes(new byte[3])));
        IntegrationTestUtils.produceKeyValuesSynchronously("input", singletonList, producerConfig, new MockTime());
        KafkaStreams kafkaStreams2 = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
        kafkaStreams2.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, "output", singletonList);
        kafkaStreams2.close();
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldRestoreStateFromSourceTopic(boolean z) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties props = props(z);
        props.put("topology.optimization", "all");
        createStateForRestoration(this.inputStream, 0);
        setCommittedOffset(this.inputStream, 1000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 0), 999L));
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 1), 999L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        streamsBuilder.table(this.inputStream, Materialized.as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer())).toStream().foreach((num, num2) -> {
            if (atomicInteger.incrementAndGet() == 2000) {
                countDownLatch2.countDown();
            }
        });
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(props), props);
        this.kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                countDownLatch.countDown();
            }
        });
        final AtomicLong atomicLong = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.1
            public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
                atomicLong.addAndGet(j);
            }
        });
        this.kafkaStreams.start();
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Long.valueOf(atomicLong.get()), IsEqual.equalTo(6000L));
        Assertions.assertTrue(countDownLatch2.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(2000));
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldRestoreStateFromChangelogTopic(boolean z) throws Exception {
        String str = this.appId + "-store-changelog";
        CLUSTER.createTopic(str, 2, 1);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties props = props(z);
        createStateForRestoration(str, 0);
        createStateForRestoration(this.inputStream, 10000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(str, 0), 999L));
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(str, 1), 999L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        streamsBuilder.table(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store")).toStream().foreach((num, num2) -> {
            if (atomicInteger.incrementAndGet() == 10000) {
                countDownLatch2.countDown();
            }
        });
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                countDownLatch.countDown();
            }
        });
        final AtomicLong atomicLong = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.2
            public void onRestoreStart(TopicPartition topicPartition, String str2, long j, long j2) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str2, long j, long j2) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str2, long j) {
                atomicLong.addAndGet(j);
            }
        });
        this.kafkaStreams.start();
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Long.valueOf(atomicLong.get()), IsEqual.equalTo(8000L));
        Assertions.assertTrue(countDownLatch2.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(10000));
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldSuccessfullyStartWhenLoggingDisabled(boolean z) throws InterruptedException {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputStream).groupByKey().reduce((num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }, Materialized.as("reduce-store").withLoggingDisabled());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props(z));
        this.kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                countDownLatch.countDown();
            }
        });
        this.kafkaStreams.start();
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldProcessDataFromStoresWithLoggingDisabled(boolean z) throws InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
        StoreBuilder withLoggingDisabled = new KeyValueStoreBuilder(Stores.lruMap(this.inputStream, 10), Serdes.Integer(), Serdes.Integer(), CLUSTER.time).withLoggingDisabled();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(withLoggingDisabled);
        KStream stream = streamsBuilder.stream(this.inputStream);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        stream.process(() -> {
            return new KeyValueStoreProcessor(this.inputStream, countDownLatch);
        }, new String[]{this.inputStream});
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props(z));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                countDownLatch2.countDown();
            }
        });
        this.kafkaStreams.start();
        countDownLatch2.await(30L, TimeUnit.SECONDS);
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean z) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(getCloseCountingStore("store")));
        createStateForRestoration(this.inputStream, 0);
        Properties props = props(z);
        props.put("num.standby.replicas", 1);
        props.put("state.dir", TestUtils.tempDirectory(this.appId + "-1").getPath());
        IntegrationTestUtils.purgeLocalStreamsState(props);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        Properties props2 = props(z);
        props2.put("num.standby.replicas", 1);
        props2.put("state.dir", TestUtils.tempDirectory(this.appId + "-2").getPath());
        IntegrationTestUtils.purgeLocalStreamsState(props2);
        KafkaStreams kafkaStreams2 = new KafkaStreams(streamsBuilder.build(), props2);
        Set<KafkaStreams.State> newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        Set<KafkaStreams.State> newSetFromMap2 = Collections.newSetFromMap(new ConcurrentHashMap());
        IntegrationTestUtils.TrackingStateRestoreListener trackingStateRestoreListener = new IntegrationTestUtils.TrackingStateRestoreListener();
        IntegrationTestUtils.TrackingStandbyUpdateListener trackingStandbyUpdateListener = new IntegrationTestUtils.TrackingStandbyUpdateListener();
        kafkaStreams.setStandbyUpdateListener(trackingStandbyUpdateListener);
        kafkaStreams2.setStandbyUpdateListener(trackingStandbyUpdateListener);
        kafkaStreams.setGlobalStateRestoreListener(trackingStateRestoreListener);
        kafkaStreams.setStateListener((state, state2) -> {
            newSetFromMap.add(state);
        });
        kafkaStreams2.setStateListener((state3, state4) -> {
            newSetFromMap2.add(state3);
        });
        try {
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(kafkaStreams, kafkaStreams2), Duration.ofSeconds(60L));
            IntegrationTestUtils.waitForCompletion(kafkaStreams, 1, 30000L);
            IntegrationTestUtils.waitForCompletion(kafkaStreams2, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(kafkaStreams, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(kafkaStreams2, 1, 30000L);
        } catch (Exception e) {
            kafkaStreams.close();
            kafkaStreams2.close();
        }
        int numStoresClosed = CloseCountingInMemoryStore.numStoresClosed();
        long j = trackingStateRestoreListener.totalNumRestored();
        newSetFromMap.clear();
        newSetFromMap2.clear();
        try {
            kafkaStreams2.close();
            waitForTransitionTo(newSetFromMap2, KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            waitForTransitionTo(newSetFromMap, KafkaStreams.State.REBALANCING, Duration.ofSeconds(60L));
            waitForTransitionTo(newSetFromMap, KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
            IntegrationTestUtils.waitForCompletion(kafkaStreams, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(kafkaStreams, 1, 30000L);
            MatcherAssert.assertThat(Long.valueOf(trackingStateRestoreListener.totalNumRestored()), CoreMatchers.equalTo(Long.valueOf(j)));
            MatcherAssert.assertThat(Integer.valueOf(CloseCountingInMemoryStore.numStoresClosed()), IsEqual.equalTo(Integer.valueOf(numStoresClosed + 2)));
            kafkaStreams.close();
            waitForTransitionTo(newSetFromMap, KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            if (z) {
                MatcherAssert.assertThat(Integer.valueOf(trackingStandbyUpdateListener.promotedPartitions.size()), CoreMatchers.equalTo(1));
            }
            MatcherAssert.assertThat(Integer.valueOf(CloseCountingInMemoryStore.numStoresClosed()), CoreMatchers.equalTo(Integer.valueOf(numStoresClosed + 4)));
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception {
        CLUSTER.createTopic("inputTopic", 5, 1);
        CLUSTER.createTopic("outputTopic", 5, 1);
        Map<String, Object> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("state.dir", TestUtils.tempDirectory(this.appId).getPath() + "-ks1"), Utils.mkEntry("client.id", this.appId + "-ks1"), Utils.mkEntry(StreamsConfig.restoreConsumerPrefix("max.poll.records"), 5)});
        Map<String, Object> mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("state.dir", TestUtils.tempDirectory(this.appId).getPath() + "-ks2"), Utils.mkEntry("client.id", this.appId + "-ks2"), Utils.mkEntry(StreamsConfig.restoreConsumerPrefix("max.poll.records"), 5)});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("inputTopic", Consumed.with(Topology.AutoOffsetReset.EARLIEST)).groupByKey().reduce((obj, obj2) -> {
            return obj2;
        }).toStream().to("outputTopic");
        List<KeyValue<Integer, Integer>> list = (List) IntStream.range(0, 100).mapToObj(i -> {
            return new KeyValue(Integer.valueOf(i), Integer.valueOf(i));
        }).collect(Collectors.toList());
        sendEvents("inputTopic", list);
        this.kafkaStreams = startKafkaStreams(streamsBuilder, null, mkMap);
        validateReceivedMessages(list, "outputTopic");
        this.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfigurations);
        TestStateRestoreListener testStateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY);
        this.kafkaStreams = startKafkaStreams(streamsBuilder, testStateRestoreListener, mkMap);
        Assertions.assertTrue(testStateRestoreListener.awaitUntilRestorationStarts());
        Assertions.assertTrue(testStateRestoreListener.awaitUntilBatchRestoredIsCalled());
        TestStateRestoreListener testStateRestoreListener2 = new TestStateRestoreListener("ks2", RESTORATION_DELAY);
        KafkaStreams startKafkaStreams = startKafkaStreams(streamsBuilder, testStateRestoreListener2, mkMap2);
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return KafkaStreams.State.RUNNING == startKafkaStreams.state();
                }, IntegrationTestUtils.DEFAULT_TIMEOUT, () -> {
                    return "kafkaStreams2 never transitioned to a RUNNING state.";
                });
                Assertions.assertTrue(testStateRestoreListener.awaitUntilRestorationSuspends());
                Assertions.assertTrue(testStateRestoreListener2.awaitUntilRestorationStarts());
                Assertions.assertTrue(testStateRestoreListener.awaitUntilRestorationEnds());
                Assertions.assertTrue(testStateRestoreListener2.awaitUntilRestorationEnds());
                if (startKafkaStreams != null) {
                    if (0 == 0) {
                        startKafkaStreams.close();
                        return;
                    }
                    try {
                        startKafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (startKafkaStreams != null) {
                if (th != null) {
                    try {
                        startKafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    startKafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    private void validateReceivedMessages(List<KeyValue<Integer, Integer>> list, String str) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + this.appId);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", IntegerDeserializer.class.getName());
        properties.setProperty("value.deserializer", IntegerDeserializer.class.getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(properties, str, list);
    }

    private KafkaStreams startKafkaStreams(StreamsBuilder streamsBuilder, StateRestoreListener stateRestoreListener, Map<String, Object> map) {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props(Utils.mkObjectProperties(map)));
        kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
        kafkaStreams.start();
        return kafkaStreams;
    }

    private void sendEvents(String str, List<KeyValue<Integer, Integer>> list) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, list, TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), CLUSTER.time);
    }

    private static KeyValueBytesStoreSupplier getCloseCountingStore(final String str) {
        return new KeyValueBytesStoreSupplier() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.3
            public String name() {
                return str;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public KeyValueStore<Bytes, byte[]> m65get() {
                return new CloseCountingInMemoryStore(str);
            }

            public String metricsScope() {
                return "close-counting";
            }
        };
    }

    private void createStateForRestoration(String str, int i) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new IntegerSerializer(), new IntegerSerializer());
        Throwable th = null;
        for (int i2 = 0; i2 < 10000; i2++) {
            try {
                try {
                    int i3 = i + i2;
                    kafkaProducer.send(new ProducerRecord(str, Integer.valueOf(i3), Integer.valueOf(i3)));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void setCommittedOffset(String str, int i) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("group.id", this.appId);
        properties.put("client.id", "commit-consumer");
        properties.put("key.deserializer", IntegerDeserializer.class);
        properties.put("value.deserializer", IntegerDeserializer.class);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List<TopicPartition> asList = Arrays.asList(new TopicPartition(str, 0), new TopicPartition(str, 1));
        kafkaConsumer.assign(asList);
        kafkaConsumer.seekToEnd(asList);
        for (TopicPartition topicPartition : asList) {
            kafkaConsumer.seek(topicPartition, kafkaConsumer.position(topicPartition) - i);
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
    }

    private void waitForTransitionTo(Set<KafkaStreams.State> set, KafkaStreams.State state, Duration duration) throws Exception {
        TestUtils.waitForCondition(() -> {
            return set.contains(state);
        }, duration.toMillis(), () -> {
            return "Client did not transition to " + state + " on time. Observed transitions: " + set;
        });
    }
}
