package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTaskTest.class */
public class StreamTaskTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String[] topic1 = {"topic1"};
    private final String[] topic2 = {"topic2"};
    private final TopicPartition partition1 = new TopicPartition(this.topic1[0], 1);
    private final TopicPartition partition2 = new TopicPartition(this.topic2[0], 1);
    private final Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(this.topic1, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(this.topic2, this.intDeserializer, this.intDeserializer);
    private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10);
    private final ProcessorTopology topology = new ProcessorTopology(Arrays.asList(this.source1, this.source2, this.processor), new HashMap<String, SourceNode>() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.1
        {
            put(StreamTaskTest.this.topic1[0], StreamTaskTest.this.source1);
            put(StreamTaskTest.this.topic2[0], StreamTaskTest.this.source2);
        }
    }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.restoreStateConsumer);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final String applicationId = "applicationId";
    private final Metrics metrics = new Metrics();
    private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final TaskId taskId00 = new TaskId(0, 0);
    private final MockTime time = new MockTime();
    private File baseDir = TestUtils.tempDirectory();
    private StateDirectory stateDirectory;
    private StreamsConfig config;
    private StreamsConfig eosConfig;
    private StreamTask task;

    private StreamsConfig createConfig(final boolean z) throws Exception {
        return new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.2
            {
                setProperty("application.id", "stream-task-test");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("state.dir", StreamTaskTest.this.baseDir.getCanonicalPath());
                setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                if (z) {
                    setProperty("processing.guarantee", "exactly_once");
                }
            }
        });
    }

    @Before
    public void setup() throws Exception {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.source1.addChild(this.processor);
        this.source2.addChild(this.processor);
        this.config = createConfig(false);
        this.eosConfig = createConfig(true);
        this.stateDirectory = new StateDirectory("applicationId", this.baseDir.getPath(), new MockTime());
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, this.producer);
        this.task.initialize();
    }

    @After
    public void cleanup() throws IOException {
        try {
            if (this.task != null) {
                this.task.close(true, false);
            }
        } finally {
            Utils.delete(this.baseDir);
        }
    }

    @Test
    public void testProcessOrder() throws Exception {
        this.task.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
    }

    private void testSpecificMetrics(String str, String str2, Map<String, String> map) {
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(str + "-latency-avg", str2, "The average latency of " + str + " operation.", map)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(str + "-latency-max", str2, "The max latency of " + str + " operation.", map)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(str + "-rate", str2, "The average number of occurrence of " + str + " operation per second.", map)));
    }

    @Test
    public void testMetrics() throws Exception {
        String taskId = this.task.id().toString();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("task-id", taskId);
        Assert.assertNotNull(this.metrics.getSensor("commit"));
        testSpecificMetrics("commit", "stream-task-metrics", linkedHashMap);
        linkedHashMap.put("task-id", "all");
        testSpecificMetrics("commit", "stream-task-metrics", linkedHashMap);
    }

    @Test
    public void testPauseResume() throws Exception {
        this.task.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 55L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 65L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(2L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition1));
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(0L, this.consumer.paused().size());
    }

    @Test
    public void testMaybePunctuate() throws Exception {
        this.task.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertTrue(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuate());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertFalse(this.task.process());
        Assert.assertFalse(this.task.maybePunctuate());
        this.processor.supplier.checkAndClearPunctuateResult(20, 30, 40);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception {
        final MockSourceNode mockSourceNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.3
            @Override // org.apache.kafka.test.MockSourceNode
            public void process(Object obj, Object obj2) {
                throw new KafkaException("KABOOM!");
            }
        };
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.singletonList(mockSourceNode), new HashMap() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.4
            {
                put(StreamTaskTest.this.topic1[0], mockSourceNode);
                put(StreamTaskTest.this.topic2[0], mockSourceNode);
            }
        }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        this.task.close(true, false);
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, processorTopology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, this.producer);
        this.task.initialize();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        try {
            this.task.process();
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain topic", message.contains("topic=" + this.topic1[0]));
            Assert.assertTrue("message=" + message + " should contain partition", message.contains("partition=" + this.partition1.partition()));
            Assert.assertTrue("message=" + message + " should contain offset", message.contains("offset=20"));
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor=" + mockSourceNode.name()));
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() throws Exception {
        ProcessorNode processorNode = new ProcessorNode("test", new AbstractProcessor() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.5
            public void init(ProcessorContext processorContext) {
                processorContext.schedule(1L);
            }

            public void process(Object obj, Object obj2) {
            }

            public void punctuate(long j) {
                throw new KafkaException("KABOOM!");
            }
        }, Collections.emptySet());
        processorNode.init(new NoOpProcessorContext());
        try {
            this.task.punctuate(processorNode, 1L);
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor 'test'"));
            MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new StreamTask(this.taskId00, "appId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, new MockStreamsMetrics(new Metrics()), this.stateDirectory, null, this.time, this.producer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.6
            RecordCollector createRecordCollector() {
                return new NoOpRecordCollector() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.6.1
                    @Override // org.apache.kafka.test.NoOpRecordCollector
                    public void flush() {
                        atomicBoolean.set(true);
                    }
                };
            }
        }.flushState();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws Exception {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("appId", "test");
        InMemoryKeyValueStore inMemoryKeyValueStore = new InMemoryKeyValueStore("test", null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.7
            public void init(ProcessorContext processorContext, StateStore stateStore) {
                processorContext.register(stateStore, true, (StateRestoreCallback) null);
            }

            public boolean persistent() {
                return true;
            }
        };
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.emptyList(), new HashMap() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.8
            {
                put(StreamTaskTest.this.partition1.topic(), StreamTaskTest.this.source1);
                put(StreamTaskTest.this.partition2.topic(), StreamTaskTest.this.source2);
            }
        }, Collections.emptyMap(), Collections.singletonList(inMemoryKeyValueStore), Collections.singletonMap("test", storeChangelogTopic), Collections.emptyList());
        final TopicPartition topicPartition = new TopicPartition(storeChangelogTopic, 0);
        this.restoreStateConsumer.updatePartitions(storeChangelogTopic, Collections.singletonList(new PartitionInfo(storeChangelogTopic, 0, (Node) null, new Node[0], new Node[0])));
        this.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        this.restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        StreamTask streamTask = new StreamTask(this.taskId00, "appId", this.partitions, processorTopology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, this.time, this.producer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.9
            RecordCollector createRecordCollector() {
                return new NoOpRecordCollector() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.9.1
                    @Override // org.apache.kafka.test.NoOpRecordCollector
                    public Map<TopicPartition, Long> offsets() {
                        return Collections.singletonMap(topicPartition, 543L);
                    }
                };
            }
        };
        streamTask.initialize();
        this.time.sleep(this.config.getLong("commit.interval.ms").longValue());
        streamTask.commit();
        MatcherAssert.assertThat(new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint")).read(), CoreMatchers.equalTo(Collections.singletonMap(topicPartition, 544L)));
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() throws Exception {
        Map originals = this.config.originals();
        originals.put("processing.guarantee", "exactly_once");
        StreamsConfig streamsConfig = new StreamsConfig(originals);
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("appId", "test");
        InMemoryKeyValueStore inMemoryKeyValueStore = new InMemoryKeyValueStore("test", null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.10
            public void init(ProcessorContext processorContext, StateStore stateStore) {
                processorContext.register(stateStore, true, (StateRestoreCallback) null);
            }

            public boolean persistent() {
                return true;
            }
        };
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.emptyList(), new HashMap() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.11
            {
                put(StreamTaskTest.this.partition1.topic(), StreamTaskTest.this.source1);
                put(StreamTaskTest.this.partition2.topic(), StreamTaskTest.this.source2);
            }
        }, Collections.emptyMap(), Collections.singletonList(inMemoryKeyValueStore), Collections.singletonMap("test", storeChangelogTopic), Collections.emptyList());
        final TopicPartition topicPartition = new TopicPartition(storeChangelogTopic, 0);
        this.restoreStateConsumer.updatePartitions(storeChangelogTopic, Collections.singletonList(new PartitionInfo(storeChangelogTopic, 0, (Node) null, new Node[0], new Node[0])));
        this.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        this.restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        StreamTask streamTask = new StreamTask(this.taskId00, "appId", this.partitions, processorTopology, this.consumer, this.changelogReader, streamsConfig, this.streamsMetrics, this.stateDirectory, null, this.time, this.producer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.12
            RecordCollector createRecordCollector() {
                return new NoOpRecordCollector() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.12.1
                    @Override // org.apache.kafka.test.NoOpRecordCollector
                    public Map<TopicPartition, Long> offsets() {
                        return Collections.singletonMap(topicPartition, 543L);
                    }
                };
            }
        };
        this.time.sleep(streamsConfig.getLong("commit.interval.ms").longValue());
        streamTask.commit();
        Assert.assertFalse(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint").exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
        this.task.processorContext().setCurrentNode(this.processor);
        try {
            this.task.punctuate(this.processor, 10L);
            Assert.fail("Should throw illegal state exception as current node is not null");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception {
        this.task.punctuate(this.processor, 5L);
        MatcherAssert.assertThat(Long.valueOf(this.processor.punctuatedAt), CoreMatchers.equalTo(5L));
        this.task.punctuate(this.processor, 10L);
        MatcherAssert.assertThat(Long.valueOf(this.processor.punctuatedAt), CoreMatchers.equalTo(10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws Exception {
        this.task.punctuate(this.processor, 5L);
        MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception {
        this.task.schedule(1L);
    }

    @Test
    public void shouldNotThrowIExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception {
        this.task.processorContext().setCurrentNode(this.processor);
        this.task.schedule(1L);
    }

    @Test
    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
        this.task.close(true, false);
        this.task = createTaskThatThrowsExceptionOnClose();
        this.task.initialize();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown runtime exception");
        } catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertTrue(this.processor.closed);
        Assert.assertTrue(this.source1.closed);
        Assert.assertTrue(this.source2.closed);
    }

    @Test
    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        Assert.assertTrue(mockProducer.transactionInitialized());
        Assert.assertTrue(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        Assert.assertFalse(mockProducer.transactionInitialized());
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertTrue(mockProducer.sentOffsets());
        Assert.assertTrue(mockProducer.transactionCommitted());
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.suspend();
        Assert.assertTrue(mockProducer.transactionCommitted());
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertFalse(mockProducer.sentOffsets());
        Assert.assertFalse(mockProducer.transactionCommitted());
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertTrue(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertTrue(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertFalse(mockProducer.transactionInFlight());
    }

    @Test
    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue(mockProducer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse(mockProducer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.close(false, false);
        Assert.assertFalse(mockProducer.transactionAborted());
    }

    @Test
    public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception {
        MockProducer mockProducer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, mockProducer);
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue(mockProducer.closed());
    }

    @Test
    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
        try {
            createTaskThatThrowsExceptionOnClose().close(true, false);
        } catch (Exception e) {
            Assert.fail("should have not closed unitialized topology");
        }
    }

    @Test
    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
        Assert.assertTrue(new StreamTask(this.taskId00, "applicationId", Utils.mkSet(new TopicPartition[]{this.partition1}), new ProcessorTopology(Collections.singletonList(this.source1), Collections.singletonMap(this.topic1[0], this.source1), Collections.emptyMap(), Collections.singletonList(new MockStateStoreSupplier.MockStateStore("store", false)), Collections.emptyMap(), Collections.emptyList()), this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, this.producer).initialize());
    }

    @Test
    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
        Assert.assertFalse(new StreamTask(this.taskId00, "applicationId", Utils.mkSet(new TopicPartition[]{this.partition1}), new ProcessorTopology(Collections.singletonList(this.source1), Collections.singletonMap(this.topic1[0], this.source1), Collections.emptyMap(), Collections.singletonList(new MockStateStoreSupplier.MockStateStore("store", false)), Collections.singletonMap("store", "changelog"), Collections.emptyList()), this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, this.producer).initialize());
    }

    private StreamTask createTaskThatThrowsExceptionOnClose() {
        final ProcessorNode processorNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.13
            @Override // org.apache.kafka.test.MockSourceNode
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        return new StreamTask(this.taskId00, "applicationId", this.partitions, new ProcessorTopology(Arrays.asList(processorNode, this.processor, this.source1, this.source2), new HashMap() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.14
            {
                put(StreamTaskTest.this.topic1[0], processorNode);
                put(StreamTaskTest.this.topic2[0], processorNode);
            }
        }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList()), this.consumer, this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, this.producer);
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... consumerRecordArr) {
        return Arrays.asList(consumerRecordArr);
    }
}
