/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamTaskTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<Integer, Integer>(new String[]{"topic1"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<Integer, Integer>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer){

        @Override
        public void process(Integer key, Integer value) {
            throw new RuntimeException("KABOOM!");
        }

        @Override
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode(10L);
    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode(10L, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final StateStore stateStore = new MockStateStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
    private final Long offset = 543L;
    private final ProcessorTopology topology = ProcessorTopology.withSources((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), (Map)new HashMap<String, SourceNode>(){
        {
            this.put("topic1", StreamTaskTest.this.source1);
            this.put("topic2", StreamTaskTest.this.source2);
        }
    });
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockProducer<byte[], byte[]> producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader((Consumer)this.restoreStateConsumer, this.stateRestoreListener, new LogContext("stream-task-test ")){

        public Map<TopicPartition, Long> restoredOffsets() {
            return Collections.singletonMap(StreamTaskTest.this.changelogPartition, StreamTaskTest.this.offset);
        }
    };
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final Metrics metrics = new Metrics();
    private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final TaskId taskId00 = new TaskId(0, 0);
    private final MockTime time = new MockTime();
    private File baseDir = TestUtils.tempDirectory();
    private StateDirectory stateDirectory;
    private StreamsConfig config;
    private StreamsConfig eosConfig;
    private StreamTask task;
    private long punctuatedAt;
    private Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            StreamTaskTest.this.punctuatedAt = timestamp;
        }
    };

    private StreamsConfig createConfig(final boolean enableEoS) throws IOException {
        return new StreamsConfig((Map)new Properties(){
            {
                this.setProperty("application.id", "stream-task-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("state.dir", StreamTaskTest.this.baseDir.getCanonicalPath());
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                if (enableEoS) {
                    this.setProperty("processing.guarantee", "exactly_once");
                }
            }
        });
    }

    @Before
    public void setup() throws IOException {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.config = this.createConfig(false);
        this.eosConfig = this.createConfig(true);
        this.stateDirectory = new StateDirectory(this.config, (Time)new MockTime());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @After
    public void cleanup() throws IOException {
        try {
            if (this.task != null) {
                try {
                    this.task.close(true, false);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        finally {
            Utils.delete((File)this.baseDir);
        }
    }

    @Test
    public void testProcessOrder() {
        this.task = this.createStatelessTask(false);
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
    }

    private void testSpecificMetrics(String operation, String groupName, Map<String, String> tags) {
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-latency-avg", groupName, "The average latency of " + operation + " operation.", tags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-latency-max", groupName, "The max latency of " + operation + " operation.", tags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-rate", groupName, "The average number of occurrence of " + operation + " operation per second.", tags)));
    }

    @Test
    public void testMetrics() {
        this.task = this.createStatelessTask(false);
        String name = this.task.id().toString();
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("task-id", name);
        String operation = "commit";
        String groupName = "stream-task-metrics";
        Assert.assertNotNull((Object)this.metrics.getSensor("commit"));
        this.testSpecificMetrics("commit", "stream-task-metrics", metricTags);
        metricTags.put("task-id", "all");
        this.testSpecificMetrics("commit", "stream-task-metrics", metricTags);
    }

    @Test
    public void testPauseResume() {
        this.task = this.createStatelessTask(false);
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 55L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 65L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
    }

    @Test
    public void testMaybePunctuateStreamTime() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 32L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 60L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 61L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)8L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)5L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)5L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 142L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 155L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 160L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 145L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 159L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 161L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
    }

    @Test
    public void testCancelPunctuateStreamTime() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        this.processorStreamTime.supplier.scheduleCancellable.cancel();
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L, now + 20L, now + 30L, now + 50L);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100L, now + 110L, now + 122L, now + 130L, now + 235L, now + 240L);
    }

    @Test
    public void testCancelPunctuateSystemTime() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.scheduleCancellable.cancel();
        this.time.sleep(10L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
        this.task = this.createTaskThatThrowsException();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition2, Collections.singletonList(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        try {
            this.task.process();
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (Exception e) {
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    throw new KafkaException("KABOOM!");
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    throw new KafkaException("KABOOM!");
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext.currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() {
        final AtomicBoolean flushed = new AtomicBoolean(false);
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
        StreamTask streamTask = new StreamTask(this.taskId00, this.partitions, this.topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.config, (StreamsMetrics)streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)this.producer){

            RecordCollector createRecordCollector(LogContext logContext, ProductionExceptionHandler exHandler) {
                return new NoOpRecordCollector(){

                    @Override
                    public void flush() {
                        flushed.set(true);
                    }
                };
            }
        };
        streamTask.flushState();
        Assert.assertTrue((boolean)flushed.get());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws IOException {
        this.task = this.createStatefulTask(false, true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint"));
        MatcherAssert.assertThat((Object)checkpoint.read(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.changelogPartition, this.offset)));
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        this.task = this.createStatefulTask(true, true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        File checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint");
        Assert.assertFalse((boolean)checkpointFile.exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail((String)"Should throw illegal state exception as current node is not null");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
        this.task = this.createStatelessTask(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)((ProcessorContextImpl)this.task.context()).currentNode(), (Matcher)CoreMatchers.nullValue());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = this.createStatelessTask(false);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = this.createStatelessTask(false);
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
        this.task = this.createTaskThatThrowsException();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown runtime exception");
        }
        catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertTrue((boolean)this.processorSystemTime.closed);
        Assert.assertTrue((boolean)this.processorStreamTime.closed);
        Assert.assertTrue((boolean)this.source1.closed);
    }

    @Test
    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        Assert.assertTrue((boolean)this.producer.transactionInitialized());
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
        this.task = this.createStatelessTask(false);
        Assert.assertFalse((boolean)this.producer.transactionInitialized());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertTrue((boolean)this.producer.sentOffsets());
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        this.task.suspend();
        Assert.assertTrue((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
        this.task = this.createStatelessTask(false);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertFalse((boolean)this.producer.sentOffsets());
        Assert.assertFalse((boolean)this.producer.transactionCommitted());
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnResumeIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        this.task.initializeTopology();
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
        this.task = this.createStatelessTask(false);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnCommitIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertTrue((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
        this.task = this.createStatelessTask(false);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertFalse((boolean)this.producer.transactionInFlight());
    }

    @Test
    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.initializeTopology();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
        this.task = this.createStatelessTask(false);
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse((boolean)this.producer.transactionAborted());
    }

    @Test
    public void shouldCloseProducerOnCloseWhenEosEnabled() {
        this.task = this.createStatelessTask(true);
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue((boolean)this.producer.closed());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
        this.task = this.createTaskThatThrowsException();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.commit();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
        StreamTask task = this.createTaskThatThrowsException();
        task.initializeStateStores();
        task.initializeTopology();
        try {
            task.suspend();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void shouldCloseStateManagerIfFailureOnTaskClose() {
        this.task = this.createStatefulTaskThatThrowsExceptionOnClose(true, false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.task = null;
        Assert.assertFalse((boolean)this.stateStore.isOpen());
    }

    @Test
    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
        StreamTask task = this.createTaskThatThrowsException();
        try {
            task.close(false, false);
        }
        catch (Exception e) {
            Assert.fail((String)"should have not closed unitialized topology");
        }
    }

    @Test
    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
        StreamTask task = this.createStatefulTask(false, false);
        Assert.assertTrue((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
        StreamTask task = this.createStatefulTask(false, true);
        Assert.assertFalse((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        final TopicPartition repartition = new TopicPartition("repartition", 1);
        ProcessorTopology topology = ProcessorTopology.withRepartitionTopics((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2}), (Map)new HashMap<String, SourceNode>(){
            {
                this.put("topic1", StreamTaskTest.this.source1);
                this.put(repartition.topic(), StreamTaskTest.this.source2);
            }
        }, Collections.singleton(repartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, repartition));
        this.task = new StreamTask(this.taskId00, (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.partition1, repartition}), topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(repartition, Collections.singletonList(new ConsumerRecord(repartition.topic(), repartition.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertTrue((boolean)this.task.process());
        this.task.commit();
        Map map = this.task.purgableOffsets();
        MatcherAssert.assertThat((Object)map, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(repartition, 11L)));
    }

    private StreamTask createStatefulTask(boolean eosEnabled, boolean logged) {
        ProcessorTopology topology = ProcessorTopology.with((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2}), (Map)new HashMap<String, SourceNode>(){
            {
                this.put("topic1", StreamTaskTest.this.source1);
                this.put("topic2", StreamTaskTest.this.source2);
            }
        }, Collections.singletonList(this.stateStore), logged ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap());
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, eosEnabled ? this.eosConfig : this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
    }

    private StreamTask createStatefulTaskThatThrowsExceptionOnClose(boolean eosEnabled, boolean logged) {
        ProcessorTopology topology = ProcessorTopology.with((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source3}), (Map)new HashMap<String, SourceNode>(){
            {
                this.put("topic1", StreamTaskTest.this.source1);
                this.put("topic2", StreamTaskTest.this.source3);
            }
        }, Collections.singletonList(this.stateStore), logged ? Collections.singletonMap("store", this.changelogPartition.topic()) : Collections.emptyMap());
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, eosEnabled ? this.eosConfig : this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
    }

    private StreamTask createStatelessTask(boolean eosEnabled) {
        ProcessorTopology topology = ProcessorTopology.withSources((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), (Map)new HashMap<String, SourceNode>(){
            {
                this.put("topic1", StreamTaskTest.this.source1);
                this.put("topic2", StreamTaskTest.this.source2);
            }
        });
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, eosEnabled ? this.eosConfig : this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
    }

    private StreamTask createTaskThatThrowsException() {
        ProcessorTopology topology = ProcessorTopology.withSources((List)Utils.mkList((Object[])new ProcessorNode[]{this.source1, this.source3, this.processorStreamTime, this.processorSystemTime}), (Map)new HashMap<String, SourceNode>(){
            {
                this.put("topic1", StreamTaskTest.this.source1);
                this.put("topic2", StreamTaskTest.this.source3);
            }
        });
        this.source1.addChild(this.processorStreamTime);
        this.source3.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source3.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)this.producer){

            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> ... recs) {
        return Arrays.asList(recs);
    }
}

