package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.class */
public class GlobalStateManagerImplTest {
    private final MockTime time = new MockTime();
    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final String storeName1 = "t1-store";
    private final String storeName2 = "t2-store";
    private final String storeName3 = "t3-store";
    private final String storeName4 = "t4-store";
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final TopicPartition t3 = new TopicPartition("t3", 1);
    private final TopicPartition t4 = new TopicPartition("t4", 1);
    private GlobalStateManagerImpl stateManager;
    private StateDirectory stateDirectory;
    private StreamsConfig streamsConfig;
    private NoOpReadOnlyStore<Object, Object> store1;
    private NoOpReadOnlyStore<Object, Object> store2;
    private NoOpReadOnlyStore<Object, Object> store3;
    private NoOpReadOnlyStore<Object, Object> store4;
    private MockConsumer<byte[], byte[]> consumer;
    private File checkpointFile;
    private ProcessorTopology topology;
    private InternalMockProcessorContext processorContext;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest$ConverterStore.class */
    private class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements TimestampedBytesStore {
        ConverterStore(String str, boolean z) {
            super(str, z);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest$TheStateRestoreCallback.class */
    private static class TheStateRestoreCallback implements StateRestoreCallback {
        private final List<KeyValue<byte[], byte[]>> restored;

        private TheStateRestoreCallback() {
            this.restored = new ArrayList();
        }

        public void restore(byte[] bArr, byte[] bArr2) {
            this.restored.add(KeyValue.pair(bArr, bArr2));
        }
    }

    static ProcessorTopology withGlobalStores(List<StateStore> list, Map<String, String> map) {
        return new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), list, map, Collections.emptySet());
    }

    @Before
    public void before() {
        HashMap hashMap = new HashMap();
        hashMap.put("t1-store", this.t1.topic());
        hashMap.put("t2-store", this.t2.topic());
        hashMap.put("t3-store", this.t3.topic());
        hashMap.put("t4-store", this.t4.topic());
        this.store1 = new NoOpReadOnlyStore<>("t1-store", true);
        this.store2 = new ConverterStore("t2-store", true);
        this.store3 = new NoOpReadOnlyStore<>("t3-store");
        this.store4 = new NoOpReadOnlyStore<>("t4-store");
        this.topology = withGlobalStores(Arrays.asList(this.store1, this.store2, this.store3, this.store4), hashMap);
        this.streamsConfig = new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.1
            {
                put("application.id", "appId");
                put("bootstrap.servers", "dummy:1234");
                put("state.dir", TestUtils.tempDirectory().getPath());
            }
        });
        this.stateDirectory = new StateDirectory(this.streamsConfig, this.time, true);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
        this.stateManager = new GlobalStateManagerImpl(new LogContext("test"), this.topology, this.consumer, this.stateDirectory, this.stateRestoreListener, this.streamsConfig);
        this.processorContext = new InternalMockProcessorContext(this.stateDirectory.globalStateDir(), this.streamsConfig);
        this.stateManager.setGlobalProcessorContext(this.processorContext);
        this.checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
    }

    @After
    public void after() throws IOException {
        this.stateDirectory.unlockGlobalState();
    }

    @Test
    public void shouldLockGlobalStateDirectory() {
        this.stateManager.initialize();
        Assert.assertTrue(new File(this.stateDirectory.globalStateDir(), ".lock").exists());
    }

    @Test(expected = LockException.class)
    public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
        StateDirectory stateDirectory = new StateDirectory(this.streamsConfig, this.time, true);
        try {
            stateDirectory.lockGlobalState();
            this.stateManager.initialize();
        } finally {
            stateDirectory.unlockGlobalState();
        }
    }

    @Test
    public void shouldReadCheckpointOffsets() throws IOException {
        Map<TopicPartition, Long> writeCheckpoint = writeCheckpoint();
        this.stateManager.initialize();
        Assert.assertEquals(writeCheckpoint, this.stateManager.changelogOffsets());
    }

    @Test
    public void shouldThrowStreamsExceptionForOldTopicPartitions() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(this.t1, 1L);
        hashMap.put(this.t2, 1L);
        hashMap.put(this.t3, 1L);
        hashMap.put(this.t4, 1L);
        HashMap hashMap2 = new HashMap(hashMap);
        hashMap2.put(new TopicPartition("oldTopic", 1), 1L);
        new OffsetCheckpoint(this.checkpointFile).write(hashMap2);
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            this.stateManager.initialize();
        }).getMessage(), CoreMatchers.equalTo("Encountered a topic-partition not associated with any global state store"));
    }

    @Test
    public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
        writeCheckpoint();
        this.stateManager.initialize();
        Assert.assertTrue(this.checkpointFile.exists());
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException {
        writeCorruptCheckpoint();
        this.stateManager.initialize();
    }

    @Test
    public void shouldInitializeStateStores() {
        this.stateManager.initialize();
        Assert.assertTrue(this.store1.initialized);
        Assert.assertTrue(this.store2.initialized);
    }

    @Test
    public void shouldReturnInitializedStoreNames() {
        Assert.assertEquals(Utils.mkSet(new String[]{"t1-store", "t2-store", "t3-store", "t4-store"}), this.stateManager.initialize());
    }

    @Test
    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
        this.stateManager.initialize();
        try {
            this.stateManager.registerStore(new NoOpReadOnlyStore("not-in-topology"), this.stateRestoreCallback);
            Assert.fail("should have raised an illegal argument exception as store is not in the topology");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
        this.stateManager.initialize();
        initializeConsumer(2L, 0L, this.t1);
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        try {
            this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
            Assert.fail("should have raised an illegal argument exception as store has already been registered");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
        this.stateManager.initialize();
        try {
            this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
            Assert.fail("Should have raised a StreamsException as there are no partition for the store");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotConvertValuesIfStoreDoesNotImplementTimestampedBytesStore() {
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        KeyValue keyValue = (KeyValue) this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals(3L, ((byte[]) keyValue.key).length);
        Assert.assertEquals(5L, ((byte[]) keyValue.value).length);
    }

    @Test
    public void shouldNotConvertValuesIfInnerStoreDoesNotImplementTimestampedBytesStore() {
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.registerStore(new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(this.store1) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.2
        }, this.stateRestoreCallback);
        KeyValue keyValue = (KeyValue) this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals(3L, ((byte[]) keyValue.key).length);
        Assert.assertEquals(5L, ((byte[]) keyValue.value).length);
    }

    @Test
    public void shouldConvertValuesIfStoreImplementsTimestampedBytesStore() {
        initializeConsumer(1L, 0L, this.t2);
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store2, this.stateRestoreCallback);
        KeyValue keyValue = (KeyValue) this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals(3L, ((byte[]) keyValue.key).length);
        Assert.assertEquals(13L, ((byte[]) keyValue.value).length);
    }

    @Test
    public void shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() {
        initializeConsumer(1L, 0L, this.t2);
        this.stateManager.initialize();
        this.stateManager.registerStore(new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(this.store2) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.3
        }, this.stateRestoreCallback);
        KeyValue keyValue = (KeyValue) this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals(3L, ((byte[]) keyValue.key).length);
        Assert.assertEquals(13L, ((byte[]) keyValue.value).length);
    }

    @Test
    public void shouldRestoreRecordsUpToHighwatermark() {
        initializeConsumer(2L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        Assert.assertEquals(2L, this.stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldListenForRestoreEvents() {
        initializeConsumer(5L, 1L, this.t1);
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        MatcherAssert.assertThat(Long.valueOf(this.stateRestoreListener.restoreStartOffset), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.stateRestoreListener.restoreEndOffset), CoreMatchers.equalTo(6L));
        MatcherAssert.assertThat(Long.valueOf(this.stateRestoreListener.totalNumRestored), CoreMatchers.equalTo(5L));
        MatcherAssert.assertThat(this.stateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START), CoreMatchers.equalTo(this.store1.name()));
        MatcherAssert.assertThat(this.stateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH), CoreMatchers.equalTo(this.store1.name()));
        MatcherAssert.assertThat(this.stateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END), CoreMatchers.equalTo(this.store1.name()));
    }

    @Test
    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
        initializeConsumer(5L, 5L, this.t1);
        new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint")).write(Collections.singletonMap(this.t1, 5L));
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        Assert.assertEquals(5L, this.stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldFlushStateStores() {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        initializeConsumer(1L, 0L, this.t2);
        this.stateManager.registerStore(this.store2, this.stateRestoreCallback);
        this.stateManager.flush();
        Assert.assertTrue(this.store1.flushed);
        Assert.assertTrue(this.store2.flushed);
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.registerStore(new NoOpReadOnlyStore(this.store1.name()) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.4
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        }, this.stateRestoreCallback);
        this.stateManager.flush();
    }

    @Test
    public void shouldCloseStateStores() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        initializeConsumer(1L, 0L, this.t2);
        this.stateManager.registerStore(this.store2, this.stateRestoreCallback);
        this.stateManager.close();
        Assert.assertFalse(this.store1.isOpen());
        Assert.assertFalse(this.store2.isOpen());
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.registerStore(new NoOpReadOnlyStore(this.store1.name()) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.5
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        }, this.stateRestoreCallback);
        this.stateManager.close();
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
        this.stateManager.initialize();
        try {
            this.stateManager.registerStore(this.store1, (StateRestoreCallback) null);
            Assert.fail("should have thrown due to null callback");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
        this.stateManager.initialize();
        this.stateManager.close();
        StateDirectory stateDirectory = new StateDirectory(this.streamsConfig, new MockTime(), true);
        try {
            Assert.assertTrue(stateDirectory.lockGlobalState());
        } finally {
            stateDirectory.unlockGlobalState();
        }
    }

    @Test
    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        this.stateManager.registerStore(new NoOpReadOnlyStore("t1-store") { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.6
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                if (!isOpen()) {
                    throw new RuntimeException("store already closed");
                }
                super.close();
            }
        }, this.stateRestoreCallback);
        this.stateManager.close();
        this.stateManager.close();
    }

    @Test
    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(1L, 0L, this.t1);
        NoOpReadOnlyStore noOpReadOnlyStore = new NoOpReadOnlyStore("t1-store") { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.7
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                super.close();
                throw new RuntimeException("KABOOM!");
            }
        };
        this.stateManager.registerStore(noOpReadOnlyStore, this.stateRestoreCallback);
        initializeConsumer(1L, 0L, this.t2);
        this.stateManager.registerStore(this.store2, this.stateRestoreCallback);
        try {
            this.stateManager.close();
        } catch (ProcessorStateException e) {
        }
        Assert.assertFalse(noOpReadOnlyStore.isOpen());
        Assert.assertFalse(this.store2.isOpen());
    }

    @Test
    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOException {
        writeCorruptCheckpoint();
        try {
            this.stateManager.initialize();
        } catch (StreamsException e) {
        }
        StateDirectory stateDirectory = new StateDirectory(this.streamsConfig, new MockTime(), true);
        try {
            Assert.assertTrue(stateDirectory.lockGlobalState());
        } finally {
            stateDirectory.unlockGlobalState();
        }
    }

    @Test
    public void shouldCheckpointOffsets() throws IOException {
        Map singletonMap = Collections.singletonMap(this.t1, 25L);
        this.stateManager.initialize();
        this.stateManager.checkpoint(singletonMap);
        MatcherAssert.assertThat(readOffsetsCheckpoint(), CoreMatchers.equalTo(singletonMap));
        MatcherAssert.assertThat(this.stateManager.changelogOffsets(), CoreMatchers.equalTo(singletonMap));
    }

    @Test
    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
        this.stateManager.initialize();
        initializeConsumer(10L, 0L, this.t1);
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        initializeConsumer(20L, 0L, this.t2);
        this.stateManager.registerStore(this.store2, this.stateRestoreCallback);
        Map changelogOffsets = this.stateManager.changelogOffsets();
        this.stateManager.checkpoint(Collections.singletonMap(this.t1, 101L));
        Map changelogOffsets2 = this.stateManager.changelogOffsets();
        MatcherAssert.assertThat(changelogOffsets2.get(this.t2), CoreMatchers.equalTo(changelogOffsets.get(this.t2)));
        MatcherAssert.assertThat(changelogOffsets2.get(this.t1), CoreMatchers.equalTo(101L));
    }

    @Test
    public void shouldSkipNullKeysWhenRestoring() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.t1, 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1, 3L);
        this.consumer.updatePartitions(this.t1.topic(), Collections.singletonList(new PartitionInfo(this.t1.topic(), this.t1.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        this.consumer.assign(Collections.singletonList(this.t1));
        this.consumer.updateEndOffsets(hashMap2);
        this.consumer.updateBeginningOffsets(hashMap);
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 1L, (Object) null, "null".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 2L, "key".getBytes(), "value".getBytes()));
        this.stateManager.initialize();
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        KeyValue keyValue = (KeyValue) this.stateRestoreCallback.restored.get(0);
        MatcherAssert.assertThat(this.stateRestoreCallback.restored, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(keyValue.key, keyValue.value))));
    }

    @Test
    public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(10L, 0L, this.t1);
        this.stateManager.registerStore(this.store1, this.stateRestoreCallback);
        this.stateManager.checkpoint(Collections.emptyMap());
        this.stateManager.close();
        Map changelogOffsets = this.stateManager.changelogOffsets();
        MatcherAssert.assertThat(changelogOffsets, CoreMatchers.equalTo(Collections.singletonMap(this.t1, 10L)));
        MatcherAssert.assertThat(readOffsetsCheckpoint(), CoreMatchers.equalTo(changelogOffsets));
    }

    @Test
    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
        this.stateManager.initialize();
        initializeConsumer(10L, 0L, this.t3);
        this.stateManager.registerStore(this.store3, this.stateRestoreCallback);
        this.stateManager.close();
        MatcherAssert.assertThat(readOffsetsCheckpoint(), CoreMatchers.equalTo(Collections.emptyMap()));
    }

    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
        return new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint")).read();
    }

    @Test
    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
        this.stateManager = new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, new StateDirectory(this.streamsConfig, this.time, true) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.8
            public boolean lockGlobalState() throws IOException {
                throw new IOException("KABOOM!");
            }
        }, this.stateRestoreListener, this.streamsConfig);
        try {
            this.stateManager.initialize();
            Assert.fail("Should have thrown LockException");
        } catch (LockException e) {
        }
    }

    @Test
    public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        this.consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.9
            public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
                atomicInteger.incrementAndGet();
                throw new TimeoutException();
            }
        };
        this.streamsConfig = new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.10
            {
                put("application.id", "appId");
                put("bootstrap.servers", "dummy:1234");
                put("state.dir", TestUtils.tempDirectory().getPath());
                put("retries", 2);
            }
        });
        try {
            new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, this.stateDirectory, this.stateRestoreListener, this.streamsConfig);
        } catch (StreamsException e) {
            Assert.assertEquals(atomicInteger.get(), 2L);
        }
    }

    @Test
    public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        this.consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.11
            public synchronized List<PartitionInfo> partitionsFor(String str) {
                atomicInteger.incrementAndGet();
                throw new TimeoutException();
            }
        };
        this.streamsConfig = new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.12
            {
                put("application.id", "appId");
                put("bootstrap.servers", "dummy:1234");
                put("state.dir", TestUtils.tempDirectory().getPath());
                put("retries", 2);
            }
        });
        try {
            new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, this.stateDirectory, this.stateRestoreListener, this.streamsConfig);
        } catch (StreamsException e) {
            Assert.assertEquals(atomicInteger.get(), 2L);
        }
    }

    private void writeCorruptCheckpoint() throws IOException {
        OutputStream newOutputStream = Files.newOutputStream(new File(this.stateManager.baseDir(), ".checkpoint").toPath(), new OpenOption[0]);
        Throwable th = null;
        try {
            newOutputStream.write("0\n1\nfoo".getBytes());
            if (newOutputStream != null) {
                if (0 == 0) {
                    newOutputStream.close();
                    return;
                }
                try {
                    newOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newOutputStream != null) {
                if (0 != 0) {
                    try {
                        newOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private void initializeConsumer(long j, long j2, TopicPartition topicPartition) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Long.valueOf(j2));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, Long.valueOf(j2 + j));
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.updateEndOffsets(hashMap2);
        this.consumer.updateBeginningOffsets(hashMap);
        for (int i = 0; i < j; i++) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), j2 + i, "key".getBytes(), "value".getBytes()));
        }
    }

    private Map<TopicPartition, Long> writeCheckpoint() throws IOException {
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(this.checkpointFile);
        Map<TopicPartition, Long> singletonMap = Collections.singletonMap(this.t1, 1L);
        offsetCheckpoint.write(singletonMap);
        return singletonMap;
    }
}
