/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorStateManagerTest {
    private File baseDir;
    private StateDirectory stateDirectory;
    private final Set<TopicPartition> noPartitions = Collections.emptySet();
    private final String applicationId = "test-application";
    private final String stateDir = "test";
    private final String persistentStoreName = "persistentStore";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"persistentStore");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"nonPersistentStore");

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("test-application", this.baseDir.getPath());
    }

    @After
    public void cleanup() {
        Utils.delete((File)this.baseDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=StreamsException.class)
    public void testNoTopic() throws IOException {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, (Consumer)new MockRestoreConsumer(), false, this.stateDirectory, null, Collections.emptyMap());
        try {
            stateMgr.register((StateStore)mockStateStore, true, mockStateStore.stateRestoreCallback);
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterPersistentStore() throws IOException {
        TaskId taskId = new TaskId(0, 2);
        long lastCheckpointedOffset = 10L;
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), lastCheckpointedOffset));
        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
        restoreConsumer.updatePartitions(this.persistentStoreTopicName, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])}));
        TopicPartition partition = new TopicPartition(this.persistentStoreTopicName, 2);
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", taskId, this.noPartitions, (Consumer)restoreConsumer, false, this.stateDirectory, null, Collections.emptyMap());
        try {
            restoreConsumer.reset();
            ArrayList<Integer> expectedKeys = new ArrayList<Integer>();
            for (int i = 1; i <= 3; ++i) {
                long offset = i;
                int key = i * 10;
                expectedKeys.add(key);
                restoreConsumer.bufferRecord((ConsumerRecord<Integer, Integer>)new ConsumerRecord(this.persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)0));
            }
            stateMgr.register((StateStore)persistentStore, true, persistentStore.stateRestoreCallback);
            Assert.assertEquals((Object)new TopicPartition(this.persistentStoreTopicName, 2), (Object)restoreConsumer.assignedPartition);
            Assert.assertEquals((long)lastCheckpointedOffset, (long)restoreConsumer.seekOffset);
            Assert.assertFalse((boolean)restoreConsumer.seekToBeginingCalled);
            Assert.assertTrue((boolean)restoreConsumer.seekToEndCalled);
            Assert.assertEquals(expectedKeys, persistentStore.keys);
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterNonPersistentStore() throws IOException {
        long lastCheckpointedOffset = 10L;
        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), lastCheckpointedOffset));
        restoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])}));
        TopicPartition partition = new TopicPartition(this.persistentStoreTopicName, 2);
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", new TaskId(0, 2), this.noPartitions, (Consumer)restoreConsumer, false, this.stateDirectory, null, Collections.emptyMap());
        try {
            restoreConsumer.reset();
            ArrayList<Integer> expectedKeys = new ArrayList<Integer>();
            long offset = -1L;
            for (int i = 1; i <= 3; ++i) {
                offset = i + 100;
                int key = i;
                expectedKeys.add(i);
                restoreConsumer.bufferRecord((ConsumerRecord<Integer, Integer>)new ConsumerRecord(this.nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)0));
            }
            stateMgr.register((StateStore)nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
            Assert.assertEquals((Object)new TopicPartition(this.nonPersistentStoreTopicName, 2), (Object)restoreConsumer.assignedPartition);
            Assert.assertEquals((long)0L, (long)restoreConsumer.seekOffset);
            Assert.assertTrue((boolean)restoreConsumer.seekToBeginingCalled);
            Assert.assertTrue((boolean)restoreConsumer.seekToEndCalled);
            Assert.assertEquals(expectedKeys, nonPersistentStore.keys);
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testChangeLogOffsets() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        long lastCheckpointedOffset = 10L;
        String storeName1 = "store1";
        String storeName2 = "store2";
        String storeName3 = "store3";
        String storeTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)storeName1);
        String storeTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)storeName2);
        String storeTopicName3 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)storeName3);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
        restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])}));
        restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])}));
        restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0])}));
        TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
        TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
        TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(partition1, 13L);
        endOffsets.put(partition2, 17L);
        restoreConsumer.updateEndOffsets(endOffsets);
        MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
        MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
        MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
        Set sourcePartitions = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(storeTopicName3, 1)});
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", taskId, (Collection)sourcePartitions, (Consumer)restoreConsumer, true, this.stateDirectory, null, Collections.emptyMap());
        try {
            restoreConsumer.reset();
            stateMgr.register((StateStore)store1, true, store1.stateRestoreCallback);
            stateMgr.register((StateStore)store2, true, store2.stateRestoreCallback);
            stateMgr.register((StateStore)store3, true, store3.stateRestoreCallback);
            Map changeLogOffsets = stateMgr.checkpointedOffsets();
            Assert.assertEquals((long)3L, (long)changeLogOffsets.size());
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition1));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition2));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition3));
            Assert.assertEquals((long)lastCheckpointedOffset, (long)((Long)changeLogOffsets.get(partition1)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition2)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition3)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetStore() throws IOException {
        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
        restoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, (Consumer)restoreConsumer, false, this.stateDirectory, null, Collections.emptyMap());
        try {
            stateMgr.register((StateStore)mockStateStore, true, mockStateStore.stateRestoreCallback);
            Assert.assertNull((Object)stateMgr.getStore("noSuchStore"));
            Assert.assertEquals((Object)mockStateStore, (Object)stateMgr.getStore("nonPersistentStore"));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlushAndClose() throws IOException {
        TaskId taskId = new TaskId(0, 1);
        File checkpointFile = new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint");
        OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
        oldCheckpoint.write(Collections.emptyMap());
        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
        restoreConsumer.updatePartitions(this.persistentStoreTopicName, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        restoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<TopicPartition, Long>();
        ackedOffsets.put(new TopicPartition(this.persistentStoreTopicName, 1), 123L);
        ackedOffsets.put(new TopicPartition(this.nonPersistentStoreTopicName, 1), 456L);
        ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"otherTopic"), 1), 789L);
        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", taskId, this.noPartitions, (Consumer)restoreConsumer, false, this.stateDirectory, null, Collections.emptyMap());
        try {
            Assert.assertFalse((boolean)checkpointFile.exists());
            restoreConsumer.reset();
            stateMgr.register((StateStore)persistentStore, true, persistentStore.stateRestoreCallback);
            restoreConsumer.reset();
            stateMgr.register((StateStore)nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
        }
        finally {
            stateMgr.flush((InternalProcessorContext)new MockProcessorContext(StateSerdes.withBuiltinTypes((String)"foo", String.class, String.class), new NoOpRecordCollector()));
            stateMgr.close(ackedOffsets);
        }
        Assert.assertTrue((boolean)persistentStore.flushed);
        Assert.assertTrue((boolean)persistentStore.closed);
        Assert.assertTrue((boolean)nonPersistentStore.flushed);
        Assert.assertTrue((boolean)nonPersistentStore.closed);
        Assert.assertTrue((boolean)checkpointFile.exists());
        OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
        Map checkpointedOffsets = newCheckpoint.read();
        Assert.assertEquals((long)1L, (long)checkpointedOffsets.size());
        Assert.assertEquals((Object)new Long(124L), checkpointedOffsets.get(new TopicPartition(this.persistentStoreTopicName, 1)));
    }

    @Test
    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, (Consumer)new MockRestoreConsumer(), false, this.stateDirectory, null, Collections.emptyMap());
        stateMgr.register((StateStore)mockStateStore, false, mockStateStore.stateRestoreCallback);
        Assert.assertNotNull((Object)stateMgr.getStore("nonPersistentStore"));
    }

    public static class MockRestoreConsumer
    extends MockConsumer<byte[], byte[]> {
        private final Serializer<Integer> serializer = new IntegerSerializer();
        public TopicPartition assignedPartition = null;
        public TopicPartition seekPartition = null;
        public long seekOffset = -1L;
        public boolean seekToBeginingCalled = false;
        public boolean seekToEndCalled = false;
        private long endOffset = 0L;
        private long currentOffset = 0L;
        private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList();

        MockRestoreConsumer() {
            super(OffsetResetStrategy.EARLIEST);
            this.reset();
        }

        public void reset() {
            this.assignedPartition = null;
            this.seekOffset = -1L;
            this.seekToBeginingCalled = false;
            this.seekToEndCalled = false;
            this.endOffset = 0L;
            this.recordBuffer.clear();
        }

        public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
            this.recordBuffer.add((ConsumerRecord<byte[], byte[]>)new ConsumerRecord(record.topic(), record.partition(), record.offset(), 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.serializer.serialize(record.topic(), record.key()), (Object)this.serializer.serialize(record.topic(), record.value())));
            this.endOffset = record.offset();
            super.updateEndOffsets(Collections.singletonMap(this.assignedPartition, this.endOffset));
        }

        public synchronized void assign(Collection<TopicPartition> partitions) {
            int numPartitions = partitions.size();
            if (numPartitions > 1) {
                throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
            }
            if (numPartitions == 1) {
                if (this.assignedPartition != null) {
                    throw new IllegalStateException("RestoreConsumer: partition already assigned");
                }
                this.assignedPartition = partitions.iterator().next();
                super.updateBeginningOffsets(Collections.singletonMap(this.assignedPartition, 0L));
            }
            super.assign(partitions);
        }

        public ConsumerRecords<byte[], byte[]> poll(long timeout) {
            for (ConsumerRecord<byte[], byte[]> record : this.recordBuffer) {
                super.addRecord(record);
            }
            this.recordBuffer.clear();
            ConsumerRecords records = super.poll(timeout);
            List partitionRecords = records.records(this.assignedPartition);
            for (ConsumerRecord record : partitionRecords) {
                this.currentOffset = record.offset();
            }
            return records;
        }

        public synchronized long position(TopicPartition partition) {
            if (!partition.equals((Object)this.assignedPartition)) {
                throw new IllegalStateException("RestoreConsumer: unassigned partition");
            }
            return this.currentOffset;
        }

        public synchronized void seek(TopicPartition partition, long offset) {
            if (offset < 0L) {
                throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
            }
            if (this.seekOffset >= 0L) {
                throw new IllegalStateException("RestoreConsumer: offset already seeked");
            }
            this.seekPartition = partition;
            this.seekOffset = offset;
            this.currentOffset = offset;
            super.seek(partition, offset);
        }

        public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            if (partitions.size() != 1) {
                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
            }
            for (TopicPartition partition : partitions) {
                if (partition.equals((Object)this.assignedPartition)) continue;
                throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
            }
            this.seekToBeginingCalled = true;
            this.currentOffset = 0L;
        }

        public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            if (partitions.size() != 1) {
                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
            }
            for (TopicPartition partition : partitions) {
                if (partition.equals((Object)this.assignedPartition)) continue;
                throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
            }
            this.seekToEndCalled = true;
            this.currentOffset = this.endOffset;
        }
    }
}

