package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StandbyTaskTest.class */
public class StandbyTaskTest {
    private File baseDir;
    private StateDirectory stateDirectory;
    private final TaskId taskId = new TaskId(0, 1);
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final String applicationId = "test-application";
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic("test-application", "store1");
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic("test-application", "store2");
    private final TopicPartition partition1 = new TopicPartition(this.storeChangelogTopicName1, 1);
    private final TopicPartition partition2 = new TopicPartition(this.storeChangelogTopicName2, 1);
    private final Set<TopicPartition> topicPartitions = Collections.emptySet();
    private final ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Utils.mkList(new StateStore[]{new MockStateStoreSupplier("store1", false).get(), new MockStateStoreSupplier("store2", true).get()}), new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.StandbyTaskTest.1
        {
            put("store1", StandbyTaskTest.this.storeChangelogTopicName1);
            put("store2", StandbyTaskTest.this.storeChangelogTopicName2);
        }
    }, Collections.emptyList());
    private final TopicPartition ktable = new TopicPartition("ktable1", 0);
    private final Set<TopicPartition> ktablePartitions = Utils.mkSet(new TopicPartition[]{this.ktable});
    private final ProcessorTopology ktableTopology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Utils.mkList(new StateStore[]{new MockStateStoreSupplier(this.ktable.topic(), true, false).get()}), new HashMap<String, String>() { // from class: org.apache.kafka.streams.processor.internals.StandbyTaskTest.2
        {
            put("ktable1", StandbyTaskTest.this.ktable.topic());
        }
    }, Collections.emptyList());
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.restoreStateConsumer, Time.SYSTEM, 5000);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);

    private StreamsConfig createConfig(final File file) throws Exception {
        return new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.StandbyTaskTest.3
            {
                setProperty("application.id", "test-application");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("state.dir", file.getCanonicalPath());
                setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        });
    }

    @Before
    public void setup() {
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])}));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])}));
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("test-application", this.baseDir.getPath(), new MockTime());
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete(this.baseDir);
    }

    @Test
    public void testStorePartitions() throws Exception {
        StandbyTask standbyTask = new StandbyTask(this.taskId, "test-application", this.topicPartitions, this.topology, this.consumer, this.changelogReader, createConfig(this.baseDir), (StreamsMetrics) null, this.stateDirectory);
        standbyTask.initialize();
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.partition2}), new HashSet(standbyTask.checkpointedOffsets().keySet()));
    }

    @Test(expected = Exception.class)
    public void testUpdateNonPersistentStore() throws Exception {
        StandbyTask standbyTask = new StandbyTask(this.taskId, "test-application", this.topicPartitions, this.topology, this.consumer, this.changelogReader, createConfig(this.baseDir), (StreamsMetrics) null, this.stateDirectory);
        this.restoreStateConsumer.assign(new ArrayList(standbyTask.checkpointedOffsets().keySet()));
        standbyTask.update(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
    }

    @Test
    public void testUpdate() throws Exception {
        StandbyTask standbyTask = new StandbyTask(this.taskId, "test-application", this.topicPartitions, this.topology, this.consumer, this.changelogReader, createConfig(this.baseDir), (StreamsMetrics) null, this.stateDirectory);
        standbyTask.initialize();
        this.restoreStateConsumer.assign(new ArrayList(standbyTask.checkpointedOffsets().keySet()));
        Iterator it = Arrays.asList(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100)).iterator();
        while (it.hasNext()) {
            this.restoreStateConsumer.bufferRecord((ConsumerRecord) it.next());
        }
        for (Map.Entry entry : standbyTask.checkpointedOffsets().entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            long longValue = ((Long) entry.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreStateConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreStateConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
        standbyTask.update(this.partition2, this.restoreStateConsumer.poll(100L).records(this.partition2));
        StandbyContextImpl context = standbyTask.context();
        MockStateStoreSupplier.MockStateStore mockStateStore = (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore("store1");
        MockStateStoreSupplier.MockStateStore mockStateStore2 = (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore("store2");
        Assert.assertEquals(Collections.emptyList(), mockStateStore.keys);
        Assert.assertEquals(Utils.mkList(new Integer[]{1, 2, 3}), mockStateStore2.keys);
        standbyTask.closeStateManager(true);
        Map read = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint")).read();
        Assert.assertEquals(1L, read.size());
        Assert.assertEquals(new Long(31L), read.get(this.partition2));
    }

    @Test
    public void testUpdateKTable() throws Exception {
        this.consumer.assign(Utils.mkList(new TopicPartition[]{this.ktable}));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(0L));
        this.consumer.commitSync(hashMap);
        this.restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(new PartitionInfo[]{new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])}));
        StandbyTask standbyTask = new StandbyTask(this.taskId, "test-application", this.ktablePartitions, this.ktableTopology, this.consumer, this.changelogReader, createConfig(this.baseDir), (StreamsMetrics) null, this.stateDirectory);
        standbyTask.initialize();
        this.restoreStateConsumer.assign(new ArrayList(standbyTask.checkpointedOffsets().keySet()));
        Iterator it = Arrays.asList(new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100), new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100), new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100), new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100)).iterator();
        while (it.hasNext()) {
            this.restoreStateConsumer.bufferRecord((ConsumerRecord) it.next());
        }
        for (Map.Entry entry : standbyTask.checkpointedOffsets().entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            long longValue = ((Long) entry.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreStateConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreStateConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
        List update = standbyTask.update(this.ktable, this.restoreStateConsumer.poll(100L).records(this.ktable));
        Assert.assertEquals(5L, update.size());
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(10L));
        this.consumer.commitSync(hashMap);
        standbyTask.commit();
        List update2 = standbyTask.update(this.ktable, update);
        Assert.assertEquals(5L, update2.size());
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(11L));
        this.consumer.commitSync(hashMap);
        standbyTask.commit();
        List update3 = standbyTask.update(this.ktable, update2);
        Assert.assertEquals(4L, update3.size());
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(45L));
        this.consumer.commitSync(hashMap);
        standbyTask.commit();
        List update4 = standbyTask.update(this.ktable, update3);
        Assert.assertEquals(1L, update4.size());
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(50L));
        this.consumer.commitSync(hashMap);
        standbyTask.commit();
        List update5 = standbyTask.update(this.ktable, update4);
        Assert.assertEquals(1L, update5.size());
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(60L));
        this.consumer.commitSync(hashMap);
        standbyTask.commit();
        Assert.assertNull(standbyTask.update(this.ktable, update5));
        standbyTask.closeStateManager(true);
        Map read = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint")).read();
        Assert.assertEquals(1L, read.size());
        Assert.assertEquals(new Long(51L), read.get(this.ktable));
    }

    @Test
    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
        List mkList = Utils.mkList(new TopicPartition[]{new TopicPartition("test-application-my-store-changelog", 0)});
        this.consumer.assign(mkList);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("test-application-my-store-changelog", 0), new OffsetAndMetadata(0L));
        this.consumer.commitSync(hashMap);
        this.restoreStateConsumer.updatePartitions("test-application-my-store-changelog", Utils.mkList(new PartitionInfo[]{new PartitionInfo("test-application-my-store-changelog", 0, Node.noNode(), new Node[0], new Node[0])}));
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{"topic"}).groupByKey().count("my-store");
        new StandbyTask(this.taskId, "test-application", mkList, kStreamBuilder.setApplicationId("test-application").build(0), this.consumer, this.changelogReader, createConfig(this.baseDir), new MockStreamsMetrics(new Metrics()), this.stateDirectory);
    }

    @Test
    public void shouldCheckpointStoreOffsetsOnCommit() throws Exception {
        this.consumer.assign(Utils.mkList(new TopicPartition[]{this.ktable}));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(this.ktable.topic(), this.ktable.partition()), new OffsetAndMetadata(100L));
        this.consumer.commitSync(hashMap);
        this.restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(new PartitionInfo[]{new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])}));
        TaskId taskId = new TaskId(0, 0);
        MockTime mockTime = new MockTime();
        StreamsConfig createConfig = createConfig(this.baseDir);
        StandbyTask standbyTask = new StandbyTask(taskId, "test-application", this.ktablePartitions, this.ktableTopology, this.consumer, this.changelogReader, createConfig, (StreamsMetrics) null, this.stateDirectory);
        standbyTask.initialize();
        this.restoreStateConsumer.assign(new ArrayList(standbyTask.checkpointedOffsets().keySet()));
        byte[] serialize = Serdes.Integer().serializer().serialize("", 1);
        standbyTask.update(this.ktable, Collections.singletonList(new ConsumerRecord(this.ktable.topic(), this.ktable.partition(), 50L, serialize, serialize)));
        mockTime.sleep(createConfig.getLong("commit.interval.ms").longValue());
        standbyTask.commit();
        MatcherAssert.assertThat(new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).read(), CoreMatchers.equalTo(Collections.singletonMap(this.ktable, 51L)));
    }

    private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... consumerRecordArr) {
        return Arrays.asList(consumerRecordArr);
    }
}
