/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StandbyTaskTest {
    private final TaskId taskId = new TaskId(0, 1);
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final String applicationId = "test-application";
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1");
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2");
    private final String globalStoreName = "ktable1";
    private final TopicPartition partition1 = new TopicPartition(this.storeChangelogTopicName1, 1);
    private final TopicPartition partition2 = new TopicPartition(this.storeChangelogTopicName2, 1);
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final Set<TopicPartition> topicPartitions = Collections.emptySet();
    private final ProcessorTopology topology = ProcessorTopology.withLocalStores((List)Utils.mkList((Object[])new StateStore[]{new MockStateStoreSupplier("store1", false).get(), new MockStateStoreSupplier("store2", true).get()}), (Map)new HashMap<String, String>(){
        {
            this.put("store1", StandbyTaskTest.this.storeChangelogTopicName1);
            this.put("store2", StandbyTaskTest.this.storeChangelogTopicName2);
        }
    });
    private final TopicPartition globalTopicPartition = new TopicPartition("ktable1", 0);
    private final Set<TopicPartition> ktablePartitions = Utils.mkSet((Object[])new TopicPartition[]{this.globalTopicPartition});
    private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(Collections.singletonList(new MockStateStoreSupplier(this.globalTopicPartition.topic(), true, false).get()), (Map)new HashMap<String, String>(){
        {
            this.put("ktable1", StandbyTaskTest.this.globalTopicPartition.topic());
        }
    });
    private File baseDir;
    private StateDirectory stateDirectory;
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader((Consumer)this.restoreStateConsumer, (StateRestoreListener)this.stateRestoreListener, new LogContext("standby-task-test "));
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);

    private StreamsConfig createConfig(final File baseDir) throws IOException {
        return new StreamsConfig((Map)new Properties(){
            {
                this.setProperty("application.id", "test-application");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("state.dir", baseDir.getCanonicalPath());
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        });
    }

    @Before
    public void setup() throws Exception {
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])}));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])}));
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(this.createConfig(this.baseDir), (Time)new MockTime());
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void testStorePartitions() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        StandbyTask task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        task.initializeStateStores();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.partition2, this.partition1}), new HashSet(task.checkpointedOffsets().keySet()));
    }

    @Test(expected=ProcessorStateException.class)
    public void testUpdateNonPersistentStore() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        StandbyTask task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        this.restoreStateConsumer.assign(new ArrayList<TopicPartition>(task.checkpointedOffsets().keySet()));
        task.update(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
    }

    @Test
    public void testUpdate() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        StandbyTask task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        task.initializeStateStores();
        Set<TopicPartition> partition = Collections.singleton(this.partition2);
        this.restoreStateConsumer.assign(partition);
        for (ConsumerRecord record : Arrays.asList(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)2, (Object)100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)3, (Object)100))) {
            this.restoreStateConsumer.bufferRecord((ConsumerRecord<Integer, Integer>)record);
        }
        this.restoreStateConsumer.seekToBeginning(partition);
        task.update(this.partition2, this.restoreStateConsumer.poll(100L).records(this.partition2));
        StandbyContextImpl context = (StandbyContextImpl)task.context();
        MockStateStore store1 = (MockStateStore)context.getStateMgr().getStore("store1");
        MockStateStore store2 = (MockStateStore)context.getStateMgr().getStore("store2");
        Assert.assertEquals(Collections.emptyList(), store1.keys);
        Assert.assertEquals((Object)Utils.mkList((Object[])new Integer[]{1, 2, 3}), store2.keys);
        task.closeStateManager(true);
        File taskDir = this.stateDirectory.directoryForTask(this.taskId);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ".checkpoint"));
        Map offsets = checkpoint.read();
        Assert.assertEquals((long)1L, (long)offsets.size());
        Assert.assertEquals((Object)new Long(31L), offsets.get(this.partition2));
    }

    @Test
    public void testUpdateKTable() throws IOException {
        this.consumer.assign((Collection)Utils.mkList((Object[])new TopicPartition[]{this.globalTopicPartition}));
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(0L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("ktable1", Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])}));
        StreamsConfig config = this.createConfig(this.baseDir);
        StandbyTask task = new StandbyTask(this.taskId, this.ktablePartitions, this.ktableTopology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        task.initializeStateStores();
        this.restoreStateConsumer.assign(new ArrayList<TopicPartition>(task.checkpointedOffsets().keySet()));
        for (ConsumerRecord consumerRecord : Arrays.asList(new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)100), new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)2, (Object)100), new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)3, (Object)100), new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)4, (Object)100), new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)5, (Object)100))) {
            this.restoreStateConsumer.bufferRecord((ConsumerRecord<Integer, Integer>)consumerRecord);
        }
        for (Map.Entry entry : task.checkpointedOffsets().entrySet()) {
            TopicPartition partition = (TopicPartition)entry.getKey();
            long offset = (Long)entry.getValue();
            if (offset >= 0L) {
                this.restoreStateConsumer.seek(partition, offset);
                continue;
            }
            this.restoreStateConsumer.seekToBeginning(Collections.singleton(partition));
        }
        List remaining = task.update(this.globalTopicPartition, this.restoreStateConsumer.poll(100L).records(this.globalTopicPartition));
        Assert.assertEquals((long)5L, (long)remaining.size());
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(10L));
        this.consumer.commitSync(committedOffsets);
        task.commit();
        remaining = task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)5L, (long)remaining.size());
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(11L));
        this.consumer.commitSync(committedOffsets);
        task.commit();
        remaining = task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)4L, (long)remaining.size());
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(45L));
        this.consumer.commitSync(committedOffsets);
        task.commit();
        remaining = task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)1L, (long)remaining.size());
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(50L));
        this.consumer.commitSync(committedOffsets);
        task.commit();
        remaining = task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)1L, (long)remaining.size());
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(60L));
        this.consumer.commitSync(committedOffsets);
        task.commit();
        remaining = task.update(this.globalTopicPartition, remaining);
        Assert.assertNull((Object)remaining);
        task.closeStateManager(true);
        File file = this.stateDirectory.directoryForTask(this.taskId);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(file, ".checkpoint"));
        Map offsets = checkpoint.read();
        Assert.assertEquals((long)1L, (long)offsets.size());
        Assert.assertEquals((Object)new Long(51L), offsets.get(this.globalTopicPartition));
    }

    @Test
    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws IOException {
        String changelogName = "test-application-my-store-changelog";
        List partitions = Utils.mkList((Object[])new TopicPartition[]{new TopicPartition("test-application-my-store-changelog", 0)});
        this.consumer.assign((Collection)partitions);
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition("test-application-my-store-changelog", 0), new OffsetAndMetadata(0L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("test-application-my-store-changelog", Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo("test-application-my-store-changelog", 0, Node.noNode(), new Node[0], new Node[0])}));
        InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        builder.stream(Collections.singleton("topic"), new ConsumedInternal()).groupByKey().count();
        StreamsConfig config = this.createConfig(this.baseDir);
        InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
        ProcessorTopology topology = internalTopologyBuilder.setApplicationId("test-application").build(Integer.valueOf(0));
        new StandbyTask(this.taskId, (Collection)partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
    }

    @Test
    public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
        this.consumer.assign((Collection)Utils.mkList((Object[])new TopicPartition[]{this.globalTopicPartition}));
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(100L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("ktable1", Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])}));
        TaskId taskId = new TaskId(0, 0);
        MockTime time = new MockTime();
        StreamsConfig config = this.createConfig(this.baseDir);
        StandbyTask task = new StandbyTask(taskId, this.ktablePartitions, this.ktableTopology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        task.initializeStateStores();
        this.restoreStateConsumer.assign(new ArrayList<TopicPartition>(task.checkpointedOffsets().keySet()));
        byte[] serializedValue = Serdes.Integer().serializer().serialize("", (Object)1);
        task.update(this.globalTopicPartition, Collections.singletonList(new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 50L, (Object)serializedValue, (Object)serializedValue)));
        time.sleep(config.getLong("commit.interval.ms").longValue());
        task.commit();
        Map checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).read();
        MatcherAssert.assertThat((Object)checkpoint, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.globalTopicPartition, 51L)));
    }

    @Test
    public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
        this.consumer.assign((Collection)Utils.mkList((Object[])new TopicPartition[]{this.globalTopicPartition}));
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(100L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("ktable1", Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])}));
        StreamsConfig config = this.createConfig(this.baseDir);
        final AtomicBoolean closedStateManager = new AtomicBoolean(false);
        StandbyTask task = new StandbyTask(this.taskId, this.ktablePartitions, this.ktableTopology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory){

            public void commit() {
                throw new RuntimeException("KABOOM!");
            }

            void closeStateManager(boolean writeCheckpoint) throws ProcessorStateException {
                closedStateManager.set(true);
            }
        };
        task.initializeStateStores();
        try {
            task.close(true, false);
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)closedStateManager.get());
    }

    private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> ... recs) {
        return Arrays.asList(recs);
    }
}

