package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.class */
public class ProcessorStateManagerTest {
    private final Set<TopicPartition> noPartitions = Collections.emptySet();
    private final String applicationId = "test-application";
    private final String persistentStoreName = "persistentStore";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "persistentStore");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "nonPersistentStore");
    private final MockStateStore persistentStore = new MockStateStore("persistentStore", true);
    private final MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false);
    private final TopicPartition persistentStorePartition = new TopicPartition(this.persistentStoreTopicName, 1);
    private final String storeName = "mockStateStore";
    private final String changelogTopic = ProcessorStateManager.storeChangelogTopic("test-application", "mockStateStore");
    private final TopicPartition changelogTopicPartition = new TopicPartition(this.changelogTopic, 0);
    private final TaskId taskId = new TaskId(0, 1);
    private final MockChangelogReader changelogReader = new MockChangelogReader();
    private final MockStateStore mockStateStore = new MockStateStore("mockStateStore", true);
    private final byte[] key = {0, 0, 0, 1};
    private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
    private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(this.changelogTopic, 0, 0, this.key, this.value);
    private final LogContext logContext = new LogContext("process-state-manager-test ");
    private File baseDir;
    private File checkpointFile;
    private OffsetCheckpoint checkpoint;
    private StateDirectory stateDirectory;

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.1
            {
                put("application.id", "test-application");
                put("bootstrap.servers", "dummy:1234");
                put("state.dir", ProcessorStateManagerTest.this.baseDir.getPath());
            }
        }), new MockTime());
        this.checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint");
        this.checkpoint = new OffsetCheckpoint(this.checkpointFile);
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete(this.baseDir);
    }

    @Test
    public void shouldRestoreStoreWithBatchingRestoreSpecification() throws Exception {
        TaskId taskId = new TaskId(0, 2);
        MockBatchingStateRestoreListener mockBatchingStateRestoreListener = new MockBatchingStateRestoreListener();
        KeyValue pair = KeyValue.pair(this.key, this.value);
        MockStateStore persistentStore = getPersistentStore();
        ProcessorStateManager standByStateManager = getStandByStateManager(taskId);
        try {
            standByStateManager.register(persistentStore, mockBatchingStateRestoreListener);
            standByStateManager.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(Integer.valueOf(mockBatchingStateRestoreListener.getRestoredRecords().size()), Is.is(1));
            Assert.assertTrue(mockBatchingStateRestoreListener.getRestoredRecords().contains(pair));
            standByStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            standByStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void shouldRestoreStoreWithSinglePutRestoreSpecification() throws Exception {
        TaskId taskId = new TaskId(0, 2);
        MockStateStore persistentStore = getPersistentStore();
        ProcessorStateManager standByStateManager = getStandByStateManager(taskId);
        try {
            standByStateManager.register(persistentStore, persistentStore.stateRestoreCallback);
            standByStateManager.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(Integer.valueOf(persistentStore.keys.size()), Is.is(1));
            Assert.assertTrue(persistentStore.keys.contains(1));
            standByStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            standByStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testRegisterPersistentStore() throws IOException {
        TaskId taskId = new TaskId(0, 2);
        MockStateStore persistentStore = getPersistentStore();
        ProcessorStateManager processorStateManager = new ProcessorStateManager(taskId, this.noPartitions, false, this.stateDirectory, new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.2
            {
                put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                put("nonPersistentStore", "nonPersistentStore");
            }
        }, this.changelogReader, false, this.logContext);
        try {
            processorStateManager.register(persistentStore, persistentStore.stateRestoreCallback);
            Assert.assertTrue(this.changelogReader.wasRegistered(new TopicPartition(this.persistentStoreTopicName, 2)));
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testRegisterNonPersistentStore() throws IOException {
        MockStateStore mockStateStore = new MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(new TaskId(0, 2), this.noPartitions, false, this.stateDirectory, new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.3
            {
                put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, this.changelogReader, false, this.logContext);
        try {
            processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
            Assert.assertTrue(this.changelogReader.wasRegistered(new TopicPartition(this.nonPersistentStoreTopicName, 2)));
        } finally {
            processorStateManager.close(Collections.emptyMap());
        }
    }

    @Test
    public void testChangeLogOffsets() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("test-application", "store1");
        String storeChangelogTopic2 = ProcessorStateManager.storeChangelogTopic("test-application", "store2");
        String storeChangelogTopic3 = ProcessorStateManager.storeChangelogTopic("test-application", "store3");
        HashMap hashMap = new HashMap();
        hashMap.put("store1", storeChangelogTopic);
        hashMap.put("store2", storeChangelogTopic2);
        hashMap.put("store3", storeChangelogTopic3);
        new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(storeChangelogTopic, 0), 10L));
        TopicPartition topicPartition = new TopicPartition(storeChangelogTopic, 0);
        TopicPartition topicPartition2 = new TopicPartition(storeChangelogTopic2, 0);
        TopicPartition topicPartition3 = new TopicPartition(storeChangelogTopic3, 1);
        MockStateStore mockStateStore = new MockStateStore("store1", true);
        MockStateStore mockStateStore2 = new MockStateStore("store2", true);
        MockStateStore mockStateStore3 = new MockStateStore("store3", true);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(taskId, Utils.mkSet(new TopicPartition[]{new TopicPartition(storeChangelogTopic3, 1)}), true, this.stateDirectory, hashMap, this.changelogReader, false, this.logContext);
        try {
            processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
            processorStateManager.register(mockStateStore2, mockStateStore2.stateRestoreCallback);
            processorStateManager.register(mockStateStore3, mockStateStore3.stateRestoreCallback);
            Map checkpointed = processorStateManager.checkpointed();
            Assert.assertEquals(3L, checkpointed.size());
            Assert.assertTrue(checkpointed.containsKey(topicPartition));
            Assert.assertTrue(checkpointed.containsKey(topicPartition2));
            Assert.assertTrue(checkpointed.containsKey(topicPartition3));
            Assert.assertEquals(10L, ((Long) checkpointed.get(topicPartition)).longValue());
            Assert.assertEquals(-1L, ((Long) checkpointed.get(topicPartition2)).longValue());
            Assert.assertEquals(-1L, ((Long) checkpointed.get(topicPartition3)).longValue());
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testGetStore() throws IOException {
        MockStateStore mockStateStore = new MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext);
        try {
            processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
            Assert.assertNull(processorStateManager.getStore("noSuchStore"));
            Assert.assertEquals(mockStateStore, processorStateManager.getStore("nonPersistentStore"));
        } finally {
            processorStateManager.close(Collections.emptyMap());
        }
    }

    @Test
    public void testFlushAndClose() throws IOException {
        this.checkpoint.write(Collections.emptyMap());
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(this.persistentStoreTopicName, 1), 123L);
        hashMap.put(new TopicPartition(this.nonPersistentStoreTopicName, 1), 456L);
        hashMap.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic("test-application", "otherTopic"), 1), 789L);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.4
            {
                put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, this.changelogReader, false, this.logContext);
        try {
            Assert.assertFalse(this.checkpointFile.exists());
            processorStateManager.register(this.persistentStore, this.persistentStore.stateRestoreCallback);
            processorStateManager.register(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertTrue(this.persistentStore.flushed);
            Assert.assertTrue(this.persistentStore.closed);
            Assert.assertTrue(this.nonPersistentStore.flushed);
            Assert.assertTrue(this.nonPersistentStore.closed);
            Assert.assertTrue(this.checkpointFile.exists());
            Map read = this.checkpoint.read();
            Assert.assertEquals(1L, read.size());
            Assert.assertEquals(new Long(124L), read.get(new TopicPartition(this.persistentStoreTopicName, 1)));
        } finally {
            processorStateManager.flush();
            processorStateManager.close(hashMap);
        }
    }

    @Test
    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        Assert.assertNotNull(processorStateManager.getStore("nonPersistentStore"));
    }

    @Test
    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException {
        Map singletonMap = Collections.singletonMap(this.persistentStorePartition, 99L);
        this.checkpoint.write(singletonMap);
        MockStateStore mockStateStore = new MockStateStore("persistentStore", true);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext);
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        processorStateManager.close((Map) null);
        MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(singletonMap));
    }

    @Test
    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.persistentStore, this.persistentStore.stateRestoreCallback);
        processorStateManager.checkpoint(Collections.singletonMap(this.persistentStorePartition, 10L));
        MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 11L)));
    }

    @Test
    public void shouldWriteCheckpointForStandbyReplica() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.persistentStore, this.persistentStore.stateRestoreCallback);
        byte[] serialize = Serdes.Integer().serializer().serialize("", 10);
        processorStateManager.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(new ConsumerRecord(this.persistentStorePartition.topic(), this.persistentStorePartition.partition(), 888L, serialize, serialize)));
        processorStateManager.checkpoint(Collections.emptyMap());
        MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 889L)));
    }

    @Test
    public void shouldNotWriteCheckpointForNonPersistent() throws IOException {
        TopicPartition topicPartition = new TopicPartition(this.nonPersistentStoreTopicName, 1);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap("nonPersistentStore", this.nonPersistentStoreTopicName), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        processorStateManager.checkpoint(Collections.singletonMap(topicPartition, 876L));
        MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.persistentStore, this.persistentStore.stateRestoreCallback);
        processorStateManager.checkpoint(Collections.singletonMap(this.persistentStorePartition, 987L));
        MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException {
        try {
            new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext).register(new MockStateStore(".checkpoint", true), (StateRestoreCallback) null);
            Assert.fail("should have thrown illegal argument exception when store name same as checkpoint file");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, false, this.logContext);
        processorStateManager.register(this.mockStateStore, (StateRestoreCallback) null);
        try {
            processorStateManager.register(this.mockStateStore, (StateRestoreCallback) null);
            Assert.fail("should have thrown illegal argument exception when store with same name already registered");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockStateStore", this.changelogTopic), this.changelogReader, false, this.logContext);
        MockStateStore mockStateStore = new MockStateStore("mockStateStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.5
            @Override // org.apache.kafka.test.MockStateStore
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        try {
            processorStateManager.flush();
            Assert.fail("Should throw ProcessorStateException if store flush throws exception");
        } catch (ProcessorStateException e) {
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockStateStore", this.changelogTopic), this.changelogReader, false, this.logContext);
        MockStateStore mockStateStore = new MockStateStore("mockStateStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.6
            @Override // org.apache.kafka.test.MockStateStore
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        try {
            processorStateManager.close(Collections.emptyMap());
            Assert.fail("Should throw ProcessorStateException if store close throws exception");
        } catch (ProcessorStateException e) {
        }
    }

    @Test
    public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockStateStore", this.changelogTopic), this.changelogReader, false, this.logContext);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockStateStore mockStateStore = new MockStateStore("mockStateStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.7
            @Override // org.apache.kafka.test.MockStateStore
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockStateStore mockStateStore2 = new MockStateStore("mockStateStore2", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.8
            @Override // org.apache.kafka.test.MockStateStore
            public void flush() {
                atomicBoolean.set(true);
            }
        };
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        processorStateManager.register(mockStateStore2, mockStateStore2.stateRestoreCallback);
        try {
            processorStateManager.flush();
        } catch (ProcessorStateException e) {
        }
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockStateStore", this.changelogTopic), this.changelogReader, false, this.logContext);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockStateStore mockStateStore = new MockStateStore("mockStateStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.9
            @Override // org.apache.kafka.test.MockStateStore
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockStateStore mockStateStore2 = new MockStateStore("mockStateStore2", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.10
            @Override // org.apache.kafka.test.MockStateStore
            public void close() {
                atomicBoolean.set(true);
            }
        };
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        processorStateManager.register(mockStateStore2, mockStateStore2.stateRestoreCallback);
        try {
            processorStateManager.close(Collections.emptyMap());
        } catch (ProcessorStateException e) {
        }
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
        this.checkpoint.write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 1), 123L));
        Assert.assertTrue(this.checkpointFile.exists());
        ProcessorStateManager processorStateManager = null;
        try {
            processorStateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), this.changelogReader, true, this.logContext);
            Assert.assertFalse(this.checkpointFile.exists());
            if (processorStateManager != null) {
                processorStateManager.close((Map) null);
            }
        } catch (Throwable th) {
            if (processorStateManager != null) {
                processorStateManager.close((Map) null);
            }
            throw th;
        }
    }

    @Test
    public void shouldSuccessfullyReInitializeStateStoresWithEosDisable() throws Exception {
        shouldSuccessfullyReInitializeStateStores(false);
    }

    @Test
    public void shouldSuccessfullyReInitializeStateStoresWithEosEnable() throws Exception {
        shouldSuccessfullyReInitializeStateStores(true);
    }

    private void shouldSuccessfullyReInitializeStateStores(boolean z) throws Exception {
        List asList = Arrays.asList(this.changelogTopicPartition, new TopicPartition("store2-changelog", 0));
        final ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, asList, false, this.stateDirectory, new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.11
            {
                put("mockStateStore", ProcessorStateManagerTest.this.changelogTopic);
                put("store2", "store2-changelog");
            }
        }, this.changelogReader, z, this.logContext);
        MockStateStore mockStateStore = new MockStateStore("mockStateStore", true);
        MockStateStore mockStateStore2 = new MockStateStore("store2", true);
        processorStateManager.register(mockStateStore, mockStateStore.stateRestoreCallback);
        processorStateManager.register(mockStateStore2, mockStateStore2.stateRestoreCallback);
        mockStateStore.initialized = false;
        mockStateStore2.initialized = false;
        processorStateManager.reinitializeStateStoresForPartitions(asList, new NoOpProcessorContext() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.12
            @Override // org.apache.kafka.test.NoOpProcessorContext
            public void register(StateStore stateStore, boolean z2, StateRestoreCallback stateRestoreCallback) {
                processorStateManager.register(stateStore, stateRestoreCallback);
            }
        });
        Assert.assertTrue(mockStateStore.initialized);
        Assert.assertTrue(mockStateStore2.initialized);
    }

    private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException {
        return new ProcessorStateManager(taskId, this.noPartitions, true, this.stateDirectory, new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.13
            {
                put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
            }
        }, this.changelogReader, false, this.logContext);
    }

    private MockStateStore getPersistentStore() {
        return new MockStateStore("persistentStore", true);
    }
}
