/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStateManagerImplTest {
    private final MockTime time = new MockTime();
    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final String storeName1 = "t1-store";
    private final String storeName2 = "t2-store";
    private final String storeName3 = "t3-store";
    private final String storeName4 = "t4-store";
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final TopicPartition t3 = new TopicPartition("t3", 1);
    private final TopicPartition t4 = new TopicPartition("t4", 1);
    private GlobalStateManagerImpl stateManager;
    private StateDirectory stateDirectory;
    private StreamsConfig streamsConfig;
    private NoOpReadOnlyStore<Object, Object> store1;
    private NoOpReadOnlyStore<Object, Object> store2;
    private NoOpReadOnlyStore<Object, Object> store3;
    private NoOpReadOnlyStore<Object, Object> store4;
    private MockConsumer<byte[], byte[]> consumer;
    private File checkpointFile;
    private ProcessorTopology topology;
    private InternalMockProcessorContext processorContext;

    static ProcessorTopology withGlobalStores(List<StateStore> stateStores, Map<String, String> storeToChangelogTopic) {
        return new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), stateStores, storeToChangelogTopic, Collections.emptySet());
    }

    @Before
    public void before() {
        HashMap<String, String> storeToTopic = new HashMap<String, String>();
        storeToTopic.put("t1-store", this.t1.topic());
        storeToTopic.put("t2-store", this.t2.topic());
        storeToTopic.put("t3-store", this.t3.topic());
        storeToTopic.put("t4-store", this.t4.topic());
        this.store1 = new NoOpReadOnlyStore("t1-store", true);
        this.store2 = new ConverterStore<Object, Object>("t2-store", true);
        this.store3 = new NoOpReadOnlyStore("t3-store");
        this.store4 = new NoOpReadOnlyStore("t4-store");
        this.topology = GlobalStateManagerImplTest.withGlobalStores(Arrays.asList(this.store1, this.store2, this.store3, this.store4), storeToTopic);
        this.streamsConfig = new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "appId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", TestUtils.tempDirectory().getPath());
            }
        });
        this.stateDirectory = new StateDirectory(this.streamsConfig, (Time)this.time, true);
        this.consumer = new MockConsumer(OffsetResetStrategy.NONE);
        this.stateManager = new GlobalStateManagerImpl(new LogContext("test"), this.topology, this.consumer, this.stateDirectory, (StateRestoreListener)this.stateRestoreListener, this.streamsConfig);
        this.processorContext = new InternalMockProcessorContext(this.stateDirectory.globalStateDir(), this.streamsConfig);
        this.stateManager.setGlobalProcessorContext((InternalProcessorContext)this.processorContext);
        this.checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
    }

    @After
    public void after() throws IOException {
        this.stateDirectory.unlockGlobalState();
    }

    @Test
    public void shouldLockGlobalStateDirectory() {
        this.stateManager.initialize();
        Assert.assertTrue((boolean)new File(this.stateDirectory.globalStateDir(), ".lock").exists());
    }

    @Test(expected=LockException.class)
    public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
        StateDirectory stateDir = new StateDirectory(this.streamsConfig, (Time)this.time, true);
        try {
            stateDir.lockGlobalState();
            this.stateManager.initialize();
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldReadCheckpointOffsets() throws IOException {
        Map<TopicPartition, Long> expected = this.writeCheckpoint();
        this.stateManager.initialize();
        Map offsets = this.stateManager.checkpointed();
        Assert.assertEquals(expected, (Object)offsets);
    }

    @Test
    public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
        this.writeCheckpoint();
        this.stateManager.initialize();
        Assert.assertTrue((boolean)this.checkpointFile.exists());
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException {
        this.writeCorruptCheckpoint();
        this.stateManager.initialize();
    }

    @Test
    public void shouldInitializeStateStores() {
        this.stateManager.initialize();
        Assert.assertTrue((boolean)this.store1.initialized);
        Assert.assertTrue((boolean)this.store2.initialized);
    }

    @Test
    public void shouldReturnInitializedStoreNames() {
        Set storeNames = this.stateManager.initialize();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"t1-store", "t2-store", "t3-store", "t4-store"}), (Object)storeNames);
    }

    @Test
    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
        this.stateManager.initialize();
        try {
            this.stateManager.register(new NoOpReadOnlyStore("not-in-topology"), (StateRestoreCallback)this.stateRestoreCallback);
            Assert.fail((String)"should have raised an illegal argument exception as store is not in the topology");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
        this.stateManager.initialize();
        this.initializeConsumer(2L, 0L, this.t1);
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        try {
            this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
            Assert.fail((String)"should have raised an illegal argument exception as store has already been registered");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
        this.stateManager.initialize();
        try {
            this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
            Assert.fail((String)"Should have raised a StreamsException as there are no partition for the store");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotConvertValuesIfStoreDoesNotImplementTimestampedBytesStore() {
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        KeyValue restoredRecord = (KeyValue)this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals((long)3L, (long)((byte[])restoredRecord.key).length);
        Assert.assertEquals((long)5L, (long)((byte[])restoredRecord.value).length);
    }

    @Test
    public void shouldNotConvertValuesIfInnerStoreDoesNotImplementTimestampedBytesStore() {
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.register((StateStore)new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(this.store1){}, (StateRestoreCallback)this.stateRestoreCallback);
        KeyValue restoredRecord = (KeyValue)this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals((long)3L, (long)((byte[])restoredRecord.key).length);
        Assert.assertEquals((long)5L, (long)((byte[])restoredRecord.value).length);
    }

    @Test
    public void shouldConvertValuesIfStoreImplementsTimestampedBytesStore() {
        this.initializeConsumer(1L, 0L, this.t2);
        this.stateManager.initialize();
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        KeyValue restoredRecord = (KeyValue)this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals((long)3L, (long)((byte[])restoredRecord.key).length);
        Assert.assertEquals((long)13L, (long)((byte[])restoredRecord.value).length);
    }

    @Test
    public void shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() {
        this.initializeConsumer(1L, 0L, this.t2);
        this.stateManager.initialize();
        this.stateManager.register((StateStore)new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(this.store2){}, (StateRestoreCallback)this.stateRestoreCallback);
        KeyValue restoredRecord = (KeyValue)this.stateRestoreCallback.restored.get(0);
        Assert.assertEquals((long)3L, (long)((byte[])restoredRecord.key).length);
        Assert.assertEquals((long)13L, (long)((byte[])restoredRecord.value).length);
    }

    @Test
    public void shouldRestoreRecordsUpToHighwatermark() {
        this.initializeConsumer(2L, 0L, this.t1);
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        Assert.assertEquals((long)2L, (long)this.stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
        this.initializeConsumer(2L, 0L, this.t1);
        this.consumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

            public Set<TopicPartition> partitions() {
                return Collections.singleton(GlobalStateManagerImplTest.this.t1);
            }
        });
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        Assert.assertEquals((long)2L, (long)this.stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldListenForRestoreEvents() {
        this.initializeConsumer(5L, 1L, this.t1);
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        MatcherAssert.assertThat((Object)this.stateRestoreListener.restoreStartOffset, (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)this.stateRestoreListener.restoreEndOffset, (Matcher)CoreMatchers.equalTo((Object)6L));
        MatcherAssert.assertThat((Object)this.stateRestoreListener.totalNumRestored, (Matcher)CoreMatchers.equalTo((Object)5L));
        MatcherAssert.assertThat((Object)this.stateRestoreListener.storeNameCalledStates.get("restore_start"), (Matcher)CoreMatchers.equalTo((Object)this.store1.name()));
        MatcherAssert.assertThat((Object)this.stateRestoreListener.storeNameCalledStates.get("restore_batch"), (Matcher)CoreMatchers.equalTo((Object)this.store1.name()));
        MatcherAssert.assertThat((Object)this.stateRestoreListener.storeNameCalledStates.get("restore_end"), (Matcher)CoreMatchers.equalTo((Object)this.store1.name()));
    }

    @Test
    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
        this.initializeConsumer(5L, 5L, this.t1);
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint"));
        offsetCheckpoint.write(Collections.singletonMap(this.t1, 5L));
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        Assert.assertEquals((long)5L, (long)this.stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldFlushStateStores() {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        this.initializeConsumer(1L, 0L, this.t2);
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.flush();
        Assert.assertTrue((boolean)this.store1.flushed);
        Assert.assertTrue((boolean)this.store2.flushed);
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore(this.store1.name()){

            @Override
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        }, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.flush();
    }

    @Test
    public void shouldCloseStateStores() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        this.initializeConsumer(1L, 0L, this.t2);
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(true);
        Assert.assertFalse((boolean)this.store1.isOpen());
        Assert.assertFalse((boolean)this.store2.isOpen());
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore(this.store1.name()){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        }, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(true);
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
        this.stateManager.initialize();
        try {
            this.stateManager.register(this.store1, null);
            Assert.fail((String)"should have thrown due to null callback");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
        this.stateManager.initialize();
        this.stateManager.close(true);
        StateDirectory stateDir = new StateDirectory(this.streamsConfig, (Time)new MockTime(), true);
        try {
            Assert.assertTrue((boolean)stateDir.lockGlobalState());
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore("t1-store"){

            @Override
            public void close() {
                if (!this.isOpen()) {
                    throw new RuntimeException("store already closed");
                }
                super.close();
            }
        }, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(true);
        this.stateManager.close(true);
    }

    @Test
    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(1L, 0L, this.t1);
        NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store"){

            @Override
            public void close() {
                super.close();
                throw new RuntimeException("KABOOM!");
            }
        };
        this.stateManager.register((StateStore)store, (StateRestoreCallback)this.stateRestoreCallback);
        this.initializeConsumer(1L, 0L, this.t2);
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        try {
            this.stateManager.close(true);
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)store.isOpen());
        Assert.assertFalse((boolean)this.store2.isOpen());
    }

    @Test
    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOException {
        this.writeCorruptCheckpoint();
        try {
            this.stateManager.initialize();
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        StateDirectory stateDir = new StateDirectory(this.streamsConfig, (Time)new MockTime(), true);
        try {
            Assert.assertTrue((boolean)stateDir.lockGlobalState());
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldCheckpointOffsets() throws IOException {
        Map<TopicPartition, Long> offsets = Collections.singletonMap(this.t1, 25L);
        this.stateManager.initialize();
        this.stateManager.checkpoint(offsets);
        Map<TopicPartition, Long> result = this.readOffsetsCheckpoint();
        MatcherAssert.assertThat(result, (Matcher)CoreMatchers.equalTo(offsets));
        MatcherAssert.assertThat((Object)this.stateManager.checkpointed(), (Matcher)CoreMatchers.equalTo(offsets));
    }

    @Test
    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
        this.stateManager.initialize();
        this.initializeConsumer(10L, 0L, this.t1);
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        this.initializeConsumer(20L, 0L, this.t2);
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        Map initialCheckpoint = this.stateManager.checkpointed();
        this.stateManager.checkpoint(Collections.singletonMap(this.t1, 101L));
        Map updatedCheckpoint = this.stateManager.checkpointed();
        MatcherAssert.assertThat(updatedCheckpoint.get(this.t2), (Matcher)CoreMatchers.equalTo(initialCheckpoint.get(this.t2)));
        MatcherAssert.assertThat(updatedCheckpoint.get(this.t1), (Matcher)CoreMatchers.equalTo((Object)101L));
    }

    @Test
    public void shouldSkipNullKeysWhenRestoring() {
        HashMap<TopicPartition, Long> startOffsets = new HashMap<TopicPartition, Long>();
        startOffsets.put(this.t1, 1L);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(this.t1, 3L);
        this.consumer.updatePartitions(this.t1.topic(), Collections.singletonList(new PartitionInfo(this.t1.topic(), this.t1.partition(), null, null, null)));
        this.consumer.assign(Collections.singletonList(this.t1));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumer.updateBeginningOffsets(startOffsets);
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 1L, null, (Object)"null".getBytes()));
        byte[] expectedKey = "key".getBytes();
        byte[] expectedValue = "value".getBytes();
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 2L, (Object)expectedKey, (Object)expectedValue));
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        KeyValue restoredKv = (KeyValue)this.stateRestoreCallback.restored.get(0);
        MatcherAssert.assertThat((Object)this.stateRestoreCallback.restored, (Matcher)CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair((Object)restoredKv.key, (Object)restoredKv.value))));
    }

    @Test
    public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(10L, 0L, this.t1);
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.checkpoint(Collections.emptyMap());
        this.stateManager.close(true);
        Map checkpointMap = this.stateManager.checkpointed();
        MatcherAssert.assertThat((Object)checkpointMap, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.t1, 10L)));
        MatcherAssert.assertThat(this.readOffsetsCheckpoint(), (Matcher)CoreMatchers.equalTo((Object)checkpointMap));
    }

    @Test
    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
        this.stateManager.initialize();
        this.initializeConsumer(10L, 0L, this.t3);
        this.stateManager.register(this.store3, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(true);
        MatcherAssert.assertThat(this.readOffsetsCheckpoint(), (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint"));
        return offsetCheckpoint.read();
    }

    @Test
    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
        this.stateManager = new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, new StateDirectory(this.streamsConfig, (Time)this.time, true){

            public boolean lockGlobalState() throws IOException {
                throw new IOException("KABOOM!");
            }
        }, (StateRestoreListener)this.stateRestoreListener, this.streamsConfig);
        try {
            this.stateManager.initialize();
            Assert.fail((String)"Should have thrown LockException");
        }
        catch (LockException lockException) {
            // empty catch block
        }
    }

    @Test
    public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
        int retries = 2;
        final AtomicInteger numberOfCalls = new AtomicInteger(0);
        this.consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
                numberOfCalls.incrementAndGet();
                throw new TimeoutException();
            }
        };
        this.streamsConfig = new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "appId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", TestUtils.tempDirectory().getPath());
                this.put("retries", (Object)2);
            }
        });
        try {
            new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, this.stateDirectory, (StateRestoreListener)this.stateRestoreListener, this.streamsConfig);
        }
        catch (StreamsException expected) {
            Assert.assertEquals((long)numberOfCalls.get(), (long)2L);
        }
    }

    @Test
    public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
        int retries = 2;
        final AtomicInteger numberOfCalls = new AtomicInteger(0);
        this.consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public synchronized List<PartitionInfo> partitionsFor(String topic) {
                numberOfCalls.incrementAndGet();
                throw new TimeoutException();
            }
        };
        this.streamsConfig = new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "appId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", TestUtils.tempDirectory().getPath());
                this.put("retries", (Object)2);
            }
        });
        try {
            new GlobalStateManagerImpl(new LogContext("mock"), this.topology, this.consumer, this.stateDirectory, (StateRestoreListener)this.stateRestoreListener, this.streamsConfig);
        }
        catch (StreamsException expected) {
            Assert.assertEquals((long)numberOfCalls.get(), (long)2L);
        }
    }

    @Test
    public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException {
        File storeDirectory1 = new File(this.stateDirectory.globalStateDir().getAbsolutePath() + File.separator + "rocksdb" + File.separator + "t1-store");
        File storeDirectory2 = new File(this.stateDirectory.globalStateDir().getAbsolutePath() + File.separator + "rocksdb" + File.separator + "t2-store");
        File storeDirectory3 = new File(this.stateDirectory.globalStateDir().getAbsolutePath() + File.separator + "t3-store");
        File storeDirectory4 = new File(this.stateDirectory.globalStateDir().getAbsolutePath() + File.separator + "t4-store");
        File testFile1 = new File(storeDirectory1.getAbsolutePath() + File.separator + "testFile");
        File testFile2 = new File(storeDirectory2.getAbsolutePath() + File.separator + "testFile");
        File testFile3 = new File(storeDirectory3.getAbsolutePath() + File.separator + "testFile");
        File testFile4 = new File(storeDirectory4.getAbsolutePath() + File.separator + "testFile");
        this.consumer.updatePartitions(this.t1.topic(), Collections.singletonList(new PartitionInfo(this.t1.topic(), this.t1.partition(), null, null, null)));
        this.consumer.updatePartitions(this.t2.topic(), Collections.singletonList(new PartitionInfo(this.t2.topic(), this.t2.partition(), null, null, null)));
        this.consumer.updatePartitions(this.t3.topic(), Collections.singletonList(new PartitionInfo(this.t3.topic(), this.t3.partition(), null, null, null)));
        this.consumer.updatePartitions(this.t4.topic(), Collections.singletonList(new PartitionInfo(this.t4.topic(), this.t4.partition(), null, null, null)));
        this.consumer.updateBeginningOffsets((Map)new HashMap<TopicPartition, Long>(){
            {
                this.put(GlobalStateManagerImplTest.this.t1, 0L);
                this.put(GlobalStateManagerImplTest.this.t2, 0L);
                this.put(GlobalStateManagerImplTest.this.t3, 0L);
                this.put(GlobalStateManagerImplTest.this.t4, 0L);
            }
        });
        this.consumer.updateEndOffsets((Map)new HashMap<TopicPartition, Long>(){
            {
                this.put(GlobalStateManagerImplTest.this.t1, 0L);
                this.put(GlobalStateManagerImplTest.this.t2, 0L);
                this.put(GlobalStateManagerImplTest.this.t3, 0L);
                this.put(GlobalStateManagerImplTest.this.t4, 0L);
            }
        });
        this.stateManager.initialize();
        this.stateManager.register(this.store1, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.register(this.store2, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.register(this.store3, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.register(this.store4, (StateRestoreCallback)this.stateRestoreCallback);
        testFile1.createNewFile();
        Assert.assertTrue((boolean)testFile1.exists());
        testFile2.createNewFile();
        Assert.assertTrue((boolean)testFile2.exists());
        testFile3.createNewFile();
        Assert.assertTrue((boolean)testFile3.exists());
        testFile4.createNewFile();
        Assert.assertTrue((boolean)testFile4.exists());
        this.stateManager.reinitializeStateStoresForPartitions(Arrays.asList(this.t1, this.t3), (InternalProcessorContext)this.processorContext);
        Assert.assertFalse((boolean)testFile1.exists());
        Assert.assertTrue((boolean)testFile2.exists());
        Assert.assertFalse((boolean)testFile3.exists());
        Assert.assertTrue((boolean)testFile4.exists());
    }

    private void writeCorruptCheckpoint() throws IOException {
        File checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
        try (OutputStream stream = Files.newOutputStream(checkpointFile.toPath(), new OpenOption[0]);){
            stream.write("0\n1\nfoo".getBytes());
        }
    }

    private void initializeConsumer(long numRecords, long startOffset, TopicPartition topicPartition) {
        HashMap<TopicPartition, Long> startOffsets = new HashMap<TopicPartition, Long>();
        startOffsets.put(topicPartition, startOffset);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(topicPartition, startOffset + numRecords);
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumer.updateBeginningOffsets(startOffsets);
        int i = 0;
        while ((long)i < numRecords) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), startOffset + (long)i, (Object)"key".getBytes(), (Object)"value".getBytes()));
            ++i;
        }
    }

    private Map<TopicPartition, Long> writeCheckpoint() throws IOException {
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(this.checkpointFile);
        Map<TopicPartition, Long> expected = Collections.singletonMap(this.t1, 1L);
        checkpoint.write(expected);
        return expected;
    }

    private class ConverterStore<K, V>
    extends NoOpReadOnlyStore<K, V>
    implements TimestampedBytesStore {
        ConverterStore(String name, boolean rocksdbStore) {
            super(name, rocksdbStore);
        }
    }

    private static class TheStateRestoreCallback
    implements StateRestoreCallback {
        private final List<KeyValue<byte[], byte[]>> restored = new ArrayList<KeyValue<byte[], byte[]>>();

        private TheStateRestoreCallback() {
        }

        public void restore(byte[] key, byte[] value) {
            this.restored.add((KeyValue<byte[], byte[]>)KeyValue.pair((Object)key, (Object)value));
        }
    }
}

