/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamTaskTest {
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Serializer<byte[]> bytesSerializer = Serdes.ByteArray().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<Integer, Integer>(new String[]{"topic1"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<Integer, Integer>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer){

        @Override
        public void process(Integer key, Integer value) {
            throw new RuntimeException("KABOOM!");
        }

        @Override
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode(10L);
    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode(10L, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final StateStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
    private final Long offset = 543L;
    private final ProcessorTopology topology = StreamTaskTest.withSources(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private MockProducer<byte[], byte[]> producer;
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader((Consumer)this.restoreStateConsumer, Duration.ZERO, this.stateRestoreListener, new LogContext("stream-task-test ")){

        public Map<TopicPartition, Long> restoredOffsets() {
            return Collections.singletonMap(StreamTaskTest.this.changelogPartition, StreamTaskTest.this.offset);
        }
    };
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final TaskId taskId00 = new TaskId(0, 0);
    private final MockTime time = new MockTime();
    private final File baseDir = TestUtils.tempDirectory();
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;
    private final Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            StreamTaskTest.this.punctuatedAt = timestamp;
        }
    };

    static ProcessorTopology withRepartitionTopics(List<ProcessorNode> processorNodes, Map<String, SourceNode> sourcesByTopic, Set<String> repartitionTopics) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), repartitionTopics);
    }

    static ProcessorTopology withSources(List<ProcessorNode> processorNodes, Map<String, SourceNode> sourcesByTopic) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
    }

    private StreamsConfig createConfig(boolean enableEoS) {
        String canonicalPath;
        try {
            canonicalPath = this.baseDir.getCanonicalPath();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"stream-task-test"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)canonicalPath), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName()), Utils.mkEntry((Object)"processing.guarantee", (Object)(enableEoS ? "exactly_once" : "at_least_once")), Utils.mkEntry((Object)"max.task.idle.ms", (Object)"100")})));
    }

    @Before
    public void setup() {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.stateDirectory = new StateDirectory(this.createConfig(false), (Time)new MockTime());
    }

    @After
    public void cleanup() throws IOException {
        try {
            if (this.task != null) {
                try {
                    this.task.close(true, false);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        finally {
            Utils.delete((File)this.baseDir);
        }
    }

    @Test
    public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        try {
            new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(true), this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
                this.producer = new MockProducer<byte[], byte[]>(false, this.bytesSerializer, this.bytesSerializer){

                    public void initTransactions() {
                        throw new TimeoutException("test");
                    }
                };
                return this.producer;
            }, null, null);
            Assert.fail((String)"Expected an exception");
        }
        catch (StreamsException expected) {
            this.assertTimeoutErrorLog(appender);
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.is((Object)"task [0_0] Failed to initialize task 0_0 due to timeout."));
            Assert.assertEquals(expected.getCause().getClass(), TimeoutException.class);
            MatcherAssert.assertThat((Object)expected.getCause().getMessage(), (Matcher)CoreMatchers.is((Object)"test"));
        }
        LogCaptureAppender.unregister(appender);
    }

    @Test
    public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        final AtomicBoolean timeOut = new AtomicBoolean(false);
        StreamTask testTask = new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(true), this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer<byte[], byte[]>(false, this.bytesSerializer, this.bytesSerializer){

                public void initTransactions() {
                    if (timeOut.get()) {
                        throw new TimeoutException("test");
                    }
                    super.initTransactions();
                }
            };
            return this.producer;
        }, null, null);
        testTask.initializeTopology();
        testTask.suspend();
        timeOut.set(true);
        try {
            testTask.resume();
            Assert.fail((String)"Expected an exception");
        }
        catch (StreamsException expected) {
            this.assertTimeoutErrorLog(appender);
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.is((Object)"task [0_0] Failed to initialize task 0_0 due to timeout."));
            Assert.assertEquals(expected.getCause().getClass(), TimeoutException.class);
            MatcherAssert.assertThat((Object)expected.getCause().getMessage(), (Matcher)CoreMatchers.is((Object)"test"));
        }
        LogCaptureAppender.unregister(appender);
    }

    private void assertTimeoutErrorLog(LogCaptureAppender appender) {
        String expectedErrorLogMessage = "task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.";
        List expectedError = appender.getEvents().stream().filter(event -> event.getMessage().equals("task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.")).map(LogCaptureAppender.Event::getLevel).collect(Collectors.toList());
        MatcherAssert.assertThat(expectedError, (Matcher)CoreMatchers.is(Collections.singletonList("ERROR")));
    }

    @Test
    public void testProcessOrder() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
    }

    @Test
    public void testMetrics() {
        this.task = this.createStatelessTask(this.createConfig(false));
        Assert.assertNotNull((Object)this.getMetric("%s-latency-avg", "The average latency of %s operation.", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("%s-latency-max", "The max latency of %s operation.", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("%s-rate", "The average number of occurrence of %s operation per second.", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("%s-latency-avg", "The average latency of %s operation.", "all"));
        Assert.assertNotNull((Object)this.getMetric("%s-latency-max", "The max latency of %s operation.", "all"));
        Assert.assertNotNull((Object)this.getMetric("%s-rate", "The average number of occurrence of %s operation per second.", "all"));
        JmxReporter reporter = new JmxReporter("kafka.streams");
        this.metrics.addReporter((MetricsReporter)reporter);
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,client-id=test,task-id=%s", this.task.id.toString())));
        Assert.assertTrue((boolean)reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=test,task-id=all"));
    }

    private KafkaMetric getMetric(String nameFormat, String descriptionFormat, String taskId) {
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(String.format(nameFormat, "commit"), "stream-task-metrics", String.format(descriptionFormat, "commit"), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)"client-id", (Object)"test")})));
    }

    @Test
    public void testPauseResume() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L), this.getConsumerRecord(this.partition2, 55L), this.getConsumerRecord(this.partition2, 65L)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 30L), this.getConsumerRecord(this.partition1, 40L), this.getConsumerRecord(this.partition1, 50L)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 142L), this.getConsumerRecord(this.partition1, 155L), this.getConsumerRecord(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 145L), this.getConsumerRecord(this.partition2, 159L), this.getConsumerRecord(this.partition2, 161L)));
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 30L), this.getConsumerRecord(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L)));
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        this.processorStreamTime.mockProcessor.scheduleCancellable.cancel();
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable.cancel();
        this.time.sleep(10L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse((boolean)this.task.commitNeeded());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse((boolean)this.task.commitNeeded());
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.requestCommit();
        Assert.assertTrue((boolean)this.task.commitRequested());
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse((boolean)this.task.isProcessable(0L));
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.isProcessable(0L));
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertTrue((boolean)this.task.isProcessable(0L));
    }

    @Test
    public void shouldBeProcessableIfWaitedForTooLong() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        MetricName enforcedProcessMetric = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)"test"), Utils.mkEntry((Object)"task-id", (Object)this.taskId00.toString())}));
        Assert.assertFalse((boolean)this.task.isProcessable(0L));
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.isProcessable(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.isProcessable(this.time.milliseconds() + 50L));
        Assert.assertTrue((boolean)this.task.isProcessable(this.time.milliseconds() + 100L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertTrue((boolean)this.task.isProcessable(this.time.milliseconds() + 101L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertTrue((boolean)this.task.isProcessable(this.time.milliseconds() + 130L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        this.task.process();
        Assert.assertFalse((boolean)this.task.isProcessable(this.time.milliseconds() + 150L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertFalse((boolean)this.task.isProcessable(this.time.milliseconds() + 249L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertTrue((boolean)this.task.isProcessable(this.time.milliseconds() + 250L));
        Assert.assertEquals((Object)3.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L, now + 20L, now + 30L, now + 50L);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100L, now + 110L, now + 122L, now + 130L, now + 235L, now + 240L);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
        this.task = this.createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition2, Collections.singletonList(this.getConsumerRecord(this.partition2, 0L)));
        try {
            this.task.process();
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (Exception e) {
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    throw new KafkaException("KABOOM!");
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    throw new KafkaException("KABOOM!");
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() {
        final AtomicBoolean flushed = new AtomicBoolean(false);
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
        StreamTask streamTask = new StreamTask(this.taskId00, this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(false), (StreamsMetricsImpl)streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, (RecordCollector)new NoOpRecordCollector(){

            @Override
            public void flush() {
                flushed.set(true);
            }
        }, this.metrics.sensor("dummy"));
        streamTask.flushState();
        Assert.assertTrue((boolean)flushed.get());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws IOException {
        this.task = this.createStatefulTask(this.createConfig(false), true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint"));
        MatcherAssert.assertThat((Object)checkpoint.read(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.changelogPartition, this.offset)));
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        this.task = this.createStatefulTask(this.createConfig(true), true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        File checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint");
        Assert.assertFalse((boolean)checkpointFile.exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail((String)"Should throw illegal state exception as current node is not null");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)((ProcessorContextImpl)this.task.context()).currentNode(), (Matcher)CoreMatchers.nullValue());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldNotCloseProducerOnCleanCloseWithEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.close(true, false);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnUncleanCloseWithEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnErrorDuringCleanCloseWithEosDisabled() {
        this.task = this.createTaskThatThrowsException(false);
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown runtime exception");
        }
        catch (RuntimeException expected) {
            this.task = null;
        }
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnErrorDuringUncleanCloseWithEosDisabled() {
        this.task = this.createTaskThatThrowsException(false);
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldCommitTransactionAndCloseProducerOnCleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotAbortTransactionAndNotCloseProducerOnErrorDuringCleanCloseWithEosEnabled() {
        this.task = this.createTaskThatThrowsException(true);
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown runtime exception");
        }
        catch (RuntimeException expected) {
            this.task = null;
        }
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldOnlyCloseProducerIfFencedOnCommitDuringCleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            this.task = null;
            Assert.assertTrue((boolean)(expected.getCause() instanceof ProducerFencedException));
        }
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
        Assert.assertFalse((boolean)this.producer.transactionAborted());
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerIfFencedOnCloseDuringCleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            this.task = null;
            Assert.assertTrue((boolean)(expected.getCause() instanceof ProducerFencedException));
        }
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.transactionAborted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
        this.task = this.createTaskThatThrowsException(true);
        this.task.initializeTopology();
        this.task.close(false, false);
        Assert.assertTrue((boolean)this.producer.transactionAborted());
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldOnlyCloseProducerIfFencedOnAbortDuringUncleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.producer.fenceProducer();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
        Assert.assertFalse((boolean)this.producer.transactionAborted());
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.transactionAborted());
        Assert.assertFalse((boolean)this.producer.closed());
    }

    @Test
    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
        this.task = this.createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown runtime exception");
        }
        catch (RuntimeException expected) {
            this.task = null;
        }
        Assert.assertTrue((boolean)this.processorSystemTime.closed);
        Assert.assertTrue((boolean)this.processorStreamTime.closed);
        Assert.assertTrue((boolean)this.source1.closed);
    }

    @Test
    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        Assert.assertTrue((boolean)this.producer.transactionInitialized());
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.producer.fenceProducer();
        try {
            this.task.initializeTopology();
            Assert.fail((String)"Should have throws TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof ProducerFencedException));
        }
    }

    @Test
    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        Assert.assertTrue((!this.producer.transactionInFlight() ? 1 : 0) != 0);
        this.task.close(false, false);
    }

    @Test
    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        Assert.assertFalse((boolean)this.producer.transactionInitialized());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        Assert.assertTrue((boolean)this.producer.sentOffsets());
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.suspend();
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        Assert.assertFalse((boolean)this.producer.sentOffsets());
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionInSuspendWhenCommitting() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.producer.fenceProducer();
        try {
            this.task.suspend();
            Assert.fail((String)"Should have throws TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof ProducerFencedException));
        }
        this.task = null;
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionInSuspendWhenClosingProducer() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        try {
            this.task.suspend();
            Assert.fail((String)"Should have throws TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof ProducerFencedException));
        }
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
    }

    @Test
    public void shouldStartNewTransactionOnResumeIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        this.task.initializeTopology();
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnCommitIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.commit();
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.commit();
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
        this.task = this.createStatelessTask(this.createConfig(false));
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.transactionAborted());
    }

    @Test
    public void shouldCloseProducerOnCloseWhenEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        this.task.initializeTopology();
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
        this.task = this.createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.commit();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
        StreamTask task = this.createTaskThatThrowsException(false);
        task.initializeStateStores();
        task.initializeTopology();
        try {
            task.suspend();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void shouldCloseStateManagerIfFailureOnTaskClose() {
        this.task = this.createStatefulTaskThatThrowsExceptionOnClose();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.task = null;
        Assert.assertFalse((boolean)this.stateStore.isOpen());
    }

    @Test
    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
        StreamTask task = this.createTaskThatThrowsException(false);
        try {
            task.close(false, false);
        }
        catch (Exception e) {
            Assert.fail((String)"should have not closed non-initialized topology");
        }
    }

    @Test
    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
        StreamTask task = this.createStatefulTask(this.createConfig(false), false);
        Assert.assertTrue((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
        StreamTask task = this.createStatefulTask(this.createConfig(false), true);
        Assert.assertFalse((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition repartition = new TopicPartition("repartition", 1);
        ProcessorTopology topology = StreamTaskTest.withRepartitionTopics(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)repartition.topic(), this.source2)}), Collections.singleton(repartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, repartition));
        this.task = new StreamTask(this.taskId00, (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.partition1, repartition}), topology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(false), this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, this.metrics.sensor("dummy"));
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 5L)));
        this.task.addRecords(repartition, Collections.singletonList(this.getConsumerRecord(repartition, 10L)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertTrue((boolean)this.task.process());
        this.task.commit();
        Map map = this.task.purgableOffsets();
        MatcherAssert.assertThat((Object)map, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(repartition, 11L)));
    }

    @Test
    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
        this.task = this.createStatelessTask(this.createConfig(true));
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        this.task = null;
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldAlwaysCommitIfEosEnabled() {
        this.task = this.createStatelessTask(this.createConfig(true));
        final RecordCollectorImpl recordCollector = new RecordCollectorImpl("StreamTask", new LogContext("StreamTaskTest "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollector.init(this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorSystemTime, 5L, PunctuationType.WALL_CLOCK_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
                recordCollector.send("result-topic1", (Object)3, (Object)5, null, Integer.valueOf(0), Long.valueOf(StreamTaskTest.this.time.milliseconds()), (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
            }
        });
        this.task.commit();
        Assert.assertEquals((long)1L, (long)this.producer.history().size());
    }

    private StreamTask createStatefulTask(StreamsConfig config, boolean logged) {
        ProcessorTopology topology = ProcessorTopologyFactories.with(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}), Collections.singletonList(this.stateStore), logged ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap());
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, this.metrics.sensor("dummy"));
    }

    private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
        ProcessorTopology topology = ProcessorTopologyFactories.with(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source3}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap());
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(true), this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, this.metrics.sensor("dummy"));
    }

    private StreamTask createStatelessTask(StreamsConfig streamsConfig) {
        ProcessorTopology topology = StreamTaskTest.withSources(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, streamsConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, this.metrics.sensor("dummy"));
    }

    private StreamTask createTaskThatThrowsException(boolean enableEos) {
        ProcessorTopology topology = StreamTaskTest.withSources(Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source3, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source3)}));
        this.source1.addChild(this.processorStreamTime);
        this.source3.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source3.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(enableEos), this.streamsMetrics, this.stateDirectory, null, (Time)this.time, () -> {
            this.producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
            return this.producer;
        }, this.metrics.sensor("dummy")){

            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long offset) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, offset, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue);
    }
}

