/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.IMocksControl;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

@RunWith(value=EasyMockRunner.class)
public class StreamTaskTest {
    private static final String APPLICATION_ID = "stream-task-test";
    private static final File BASE_DIR = TestUtils.tempDirectory();
    private static final long DEFAULT_TIMESTAMP = 1000L;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(this.intDeserializer, this.intDeserializer){

        @Override
        public void process(Record<Integer, Integer> record) {
            throw new RuntimeException("KABOOM!");
        }

        @Override
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode(10L);
    private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode(10L, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 1);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private MockTime time = new MockTime();
    private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), (Time)this.time);
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager stateManager;
    @Mock(type=MockType.NICE)
    private RecordCollector recordCollector;
    @Mock(type=MockType.NICE)
    private ThreadCache cache;
    private final Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            StreamTaskTest.this.punctuatedAt = timestamp;
        }
    };

    private static ProcessorTopology withRepartitionTopics(List<ProcessorNode<?, ?, ?, ?>> processorNodes, Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic, Set<String> repartitionTopics) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), repartitionTopics);
    }

    private static ProcessorTopology withSources(List<ProcessorNode<?, ?, ?, ?>> processorNodes, Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
    }

    private static StreamsConfig createConfig(boolean enableEoS, String enforcedProcessingValue) {
        String canonicalPath;
        try {
            canonicalPath = BASE_DIR.getCanonicalPath();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)APPLICATION_ID), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)canonicalPath), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName()), Utils.mkEntry((Object)"processing.guarantee", (Object)(enableEoS ? "exactly_once" : "at_least_once")), Utils.mkEntry((Object)"max.task.idle.ms", (Object)enforcedProcessingValue)})));
    }

    @Before
    public void setup() {
        EasyMock.expect((Object)this.stateManager.taskId()).andStubReturn((Object)this.taskId);
        EasyMock.expect((Object)this.stateManager.taskType()).andStubReturn((Object)Task.TaskType.ACTIVE);
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.consumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)0L), Utils.mkEntry((Object)this.partition2, (Object)0L)}));
        this.stateDirectory = new StateDirectory(StreamTaskTest.createConfig(false, "100"), (Time)new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            try {
                this.task.suspend();
            }
            catch (IllegalStateException maybeSwallow) {
                if (!maybeSwallow.getMessage().startsWith("Illegal state CLOSED")) {
                    throw maybeSwallow;
                }
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete((File)BASE_DIR);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)false);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        this.stateManager.registerStore((StateStore)this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateDirectory, this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), false);
        Assert.assertThrows(LockException.class, () -> ((StreamTask)this.task).initializeIfNeeded());
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException {
        IMocksControl ctrl = EasyMock.createStrictControl();
        ProcessorStateManager stateManager = (ProcessorStateManager)ctrl.createMock(ProcessorStateManager.class);
        EasyMock.expect((Object)stateManager.taskType()).andStubReturn((Object)Task.TaskType.ACTIVE);
        this.stateDirectory = (StateDirectory)ctrl.createMock(StateDirectory.class);
        stateManager.registerGlobalStateStores(Collections.emptyList());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)stateManager.taskId()).andReturn((Object)this.taskId);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)true);
        stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)stateManager.baseDir()).andReturn((Object)TestUtils.tempDirectory((String)"state_store"));
        this.stateDirectory.unlock(this.taskId);
        EasyMock.expectLastCall();
        ctrl.checkOrder(true);
        ctrl.replay();
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(true, "100"), true, stateManager);
        this.task.suspend();
        this.task.closeDirty();
        this.task = null;
        ctrl.verify();
    }

    @Test
    public void shouldReadCommittedStreamTimeOnInitialize() {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.consumer.commitSync(this.partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, StreamTask.encodeTimestamp((long)10L)))));
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        Assert.assertEquals((long)-1L, (long)this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertEquals((long)10L, (long)this.task.streamTime());
    }

    @Test
    public void shouldTransitToRestoringThenRunningAfterCreation() throws IOException {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)true);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L));
        this.stateManager.registerStore((StateStore)this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateDirectory, this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        Assert.assertEquals((Object)Task.State.CREATED, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        this.task.completeRestoration();
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        Assert.assertTrue((boolean)this.source1.initialized);
        Assert.assertTrue((boolean)this.source2.initialized);
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldProcessInOrder() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
    }

    @Test
    public void shouldRecordBufferedRecords() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        KafkaMetric metric = this.getMetric("active-buffer", "%s-count", this.task.id().toString(), "latest");
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L)));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)2.0));
        this.task.process(0L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)1.0));
    }

    @Test
    public void shouldRecordProcessRatio() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        KafkaMetric metric = this.getMetric("active-process", "%s-ratio", this.task.id().toString(), "latest");
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessBatchTime(15L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.25));
        this.task.recordProcessBatchTime(10L);
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.25));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessTimeRatioAndBufferSize(20L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)1.0));
    }

    @Test
    public void shouldRecordE2ELatencyOnSourceNodeAndTerminalNodes() {
        this.time = new MockTime(0L, 0L, 0L);
        this.metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), (Time)this.time);
        MockSourceNode<Integer, Integer, Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(this.intDeserializer, this.intDeserializer){
            InternalProcessorContext context;

            @Override
            public void init(InternalProcessorContext context) {
                this.context = context;
                super.init(context);
            }

            @Override
            public void process(Record<Integer, Integer> record) {
                if ((Integer)record.key() % 2 == 0) {
                    this.context.forward(record);
                }
            }
        };
        this.task = this.createStatelessTaskWithForwardingTopology((SourceNode<Integer, Integer, Integer, Integer>)evenKeyForwardingSourceNode);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        String sourceNodeName = evenKeyForwardingSourceNode.name();
        String terminalNodeName = this.processorStreamTime.name();
        Metric sourceAvg = this.getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), sourceNodeName, "latest");
        Metric sourceMin = this.getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), sourceNodeName, "latest");
        Metric sourceMax = this.getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), sourceNodeName, "latest");
        Metric terminalAvg = this.getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), terminalNodeName, "latest");
        Metric terminalMin = this.getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), terminalNodeName, "latest");
        Metric terminalMax = this.getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), terminalNodeName, "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(0, 0L)));
        this.task.process(10L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(1, 0L)));
        this.task.process(15L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)12.5));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)15.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(2, 0L)));
        this.task.process(23L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.0));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.5));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(3, 0L)));
        this.task.process(5L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)13.25));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)5.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.5));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
    }

    @Test
    public void shouldConstructMetricsWithBuiltInMetricsVersion0100To24() {
        this.testMetrics("0.10.0-2.4");
    }

    @Test
    public void shouldConstructMetricsWithBuiltInMetricsVersionLatest() {
        this.testMetrics("latest");
    }

    private void testMetrics(String builtInMetricsVersion) {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), builtInMetricsVersion);
        Assert.assertNotNull((Object)this.getMetric("enforced-processing", "%s-rate", this.task.id().toString(), builtInMetricsVersion));
        Assert.assertNotNull((Object)this.getMetric("enforced-processing", "%s-total", this.task.id().toString(), builtInMetricsVersion));
        Assert.assertNotNull((Object)this.getMetric("record-lateness", "%s-avg", this.task.id().toString(), builtInMetricsVersion));
        Assert.assertNotNull((Object)this.getMetric("record-lateness", "%s-max", this.task.id().toString(), builtInMetricsVersion));
        Assert.assertNotNull((Object)this.getMetric("active-process", "%s-ratio", this.task.id().toString(), builtInMetricsVersion));
        Assert.assertNotNull((Object)this.getMetric("active-buffer", "%s-count", this.task.id().toString(), builtInMetricsVersion));
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            this.testMetricsForBuiltInMetricsVersion0100To24();
        } else {
            this.testMetricsForBuiltInMetricsVersionLatest();
        }
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        String threadIdTag = "latest".equals(builtInMetricsVersion) ? "thread-id" : "client-id";
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", threadIdTag, this.threadId, this.task.id())));
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", threadIdTag, this.threadId)));
        }
    }

    private void testMetricsForBuiltInMetricsVersionLatest() {
        String builtInMetricsVersion = "latest";
        Assert.assertNull((Object)this.getMetric("commit", "%s-latency-avg", "all", "latest"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-latency-max", "all", "latest"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-rate", "all", "latest"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-total", "all", "latest"));
        Assert.assertNotNull((Object)this.getMetric("process", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull((Object)this.getMetric("process", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-rate", this.task.id().toString(), "latest"));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-total", this.task.id().toString(), "latest"));
    }

    private void testMetricsForBuiltInMetricsVersion0100To24() {
        String builtInMetricsVersion = "0.10.0-2.4";
        Assert.assertNotNull((Object)this.getMetric("commit", "%s-rate", "all", "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("process", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("process", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("punctuate", "%s-rate", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull((Object)this.getMetric("punctuate", "%s-total", this.task.id().toString(), "0.10.0-2.4"));
    }

    private KafkaMetric getMetric(String operation, String nameFormat, String taskId, String builtInMetricsVersion) {
        String descriptionIsNotVerified = "";
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(String.format(nameFormat, operation), "stream-task-metrics", "", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)("latest".equals(builtInMetricsVersion) ? "thread-id" : "client-id"), (Object)Thread.currentThread().getName())})));
    }

    private Metric getProcessorMetric(String operation, String nameFormat, String taskId, String processorNodeId, String builtInMetricsVersion) {
        return StreamsTestUtils.getMetricByNameFilterByTags(this.metrics.metrics(), String.format(nameFormat, operation), "stream-processor-node-metrics", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)"processor-node-id", (Object)processorNodeId), Utils.mkEntry((Object)("latest".equals(builtInMetricsVersion) ? "thread-id" : "client-id"), (Object)Thread.currentThread().getName())}));
    }

    @Test
    public void shouldPauseAndResumeBasedOnBufferedRecords() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L), this.getConsumerRecord(this.partition2, 55L), this.getConsumerRecord(this.partition2, 65L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 30L), this.getConsumerRecord(this.partition1, 40L), this.getConsumerRecord(this.partition1, 50L)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 142L), this.getConsumerRecord(this.partition1, 155L), this.getConsumerRecord(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 145L), this.getConsumerRecord(this.partition2, 159L), this.getConsumerRecord(this.partition2, 161L)));
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 30L), this.getConsumerRecord(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 25L), this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L)));
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        this.processorStreamTime.mockProcessor.scheduleCancellable().cancel();
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable().cancel();
        this.time.sleep(10L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
    }

    @Test
    public void shouldCommitNextOffsetFromQueueIfAvailable() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 0L), this.getConsumerRecord(this.partition1, 3L), this.getConsumerRecord(this.partition1, 5L)));
        this.task.process(0L);
        this.task.process(0L);
        Map offsetsAndMetadata = this.task.prepareCommit();
        MatcherAssert.assertThat((Object)offsetsAndMetadata, (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(5L, StreamTask.encodeTimestamp((long)3L)))})));
    }

    @Test
    public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.consumer.addRecord(this.getConsumerRecord(this.partition1, 0L));
        this.consumer.addRecord(this.getConsumerRecord(this.partition1, 1L));
        this.consumer.addRecord(this.getConsumerRecord(this.partition1, 2L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        Map offsetsAndMetadata = this.task.prepareCommit();
        MatcherAssert.assertThat((Object)offsetsAndMetadata, (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(3L, StreamTask.encodeTimestamp((long)0L)))})));
    }

    @Test
    public void shouldFailOnCommitIfTaskIsClosed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.suspend();
        this.task.transitionTo(Task.State.CLOSED);
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((StreamTask)this.task).prepareCommit());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Illegal state CLOSED while preparing active task 0_0 for committing"));
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.requestCommit();
        Assert.assertTrue((boolean)this.task.commitRequested());
    }

    @Test
    public void shouldEncodeAndDecodeMetadata() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        Assert.assertEquals((long)1000L, (long)this.task.decodeTimestamp(StreamTask.encodeTimestamp((long)1000L)));
    }

    @Test
    public void shouldReturnUnknownTimestampIfUnknownVersion() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        byte[] emptyMessage = new byte[]{2};
        String encodedString = Base64.getEncoder().encodeToString(emptyMessage);
        Assert.assertEquals((long)-1L, (long)this.task.decodeTimestamp(encodedString));
    }

    @Test
    public void shouldReturnUnknownTimestampIfEmptyMessage() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        Assert.assertEquals((long)-1L, (long)this.task.decodeTimestamp(""));
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse((boolean)this.task.process(0L));
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.process(0L));
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertTrue((boolean)this.task.process(0L));
    }

    @Test
    public void shouldBeProcessableIfWaitedForTooLong() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MetricName enforcedProcessMetric = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())}));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Arrays.asList(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes), new ConsumerRecord("topic1", 1, 1L, (Object)bytes, (Object)bytes), new ConsumerRecord("topic1", 1, 2L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 99L));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 100L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 101L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 130L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 150L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 249L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 250L));
        Assert.assertEquals((Object)3.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
    }

    @Test
    public void shouldNotBeProcessableIfNoDataAvailble() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MetricName enforcedProcessMetric = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())}));
        Assert.assertFalse((boolean)this.task.process(0L));
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 99L));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 100L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 110L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 150L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds() + 249L));
        Assert.assertEquals((Object)1.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds() + 250L));
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(enforcedProcessMetric).metricValue());
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L, now + 20L, now + 30L, now + 50L);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long now = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100L, now + 110L, now + 122L, now + 130L, now + 235L, now + 240L);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, timestamp -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldNotShareHeadersBetweenPunctuateIterations() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> this.task.processorContext().headers().add("dummy", (byte[])null));
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> Assert.assertFalse((boolean)this.task.processorContext().headers().iterator().hasNext()));
    }

    @Test
    public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecord(this.partition1, 10L), this.getConsumerRecord(this.partition1, 20L), this.getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecord(this.partition2, 5L), this.getConsumerRecord(this.partition2, 35L), this.getConsumerRecord(this.partition2, 45L)));
        Assert.assertThrows(StreamsException.class, () -> this.task.process(0L));
    }

    @Test
    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)true);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = this.createDisconnectedTask(StreamTaskTest.createConfig(false, "100"));
        this.task.initializeIfNeeded();
        Assert.assertThrows(TimeoutException.class, () -> ((StreamTask)this.task).completeRestoration());
    }

    @Test
    public void shouldReInitializeTopologyWhenResuming() throws IOException {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)true);
        EasyMock.expect((Object)this.recordCollector.offsets()).andThrow((Throwable)((Object)new AssertionError((Object)"Should not try to read offsets"))).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.resume();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.completeRestoration();
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        Assert.assertTrue((boolean)this.source1.initialized);
        Assert.assertTrue((boolean)this.source2.initialized);
        EasyMock.verify((Object[])new Object[]{this.stateManager, this.recordCollector});
        EasyMock.reset((Object[])new Object[]{this.recordCollector});
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        EasyMock.replay((Object[])new Object[]{this.recordCollector});
    }

    @Test
    public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
        Long offset = 543L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, offset)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 20L));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify((Object[])new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
        Long offset = 543L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, offset)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 0L)).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 12000L));
        this.stateManager.registerStore((StateStore)this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify((Object[])new Object[]{this.recordCollector});
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.registerStore((StateStore)this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(true, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        File checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint");
        Assert.assertFalse((boolean)checkpointFile.exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail((String)"Should throw illegal state exception as current node is not null");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.schedule(1L, PunctuationType.STREAM_TIME, timestamp -> {});
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, timestamp -> {});
    }

    @Test
    public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertThrows(RuntimeException.class, () -> this.task.suspend());
        this.task.closeDirty();
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition repartition = new TopicPartition("repartition", 1);
        ProcessorTopology topology = StreamTaskTest.withRepartitionTopics(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)repartition.topic(), this.source2)}), Collections.singleton(repartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, repartition));
        this.consumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)repartition, (Object)0L)}));
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig config = StreamTaskTest.createConfig(false, "0");
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        this.task = new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1, repartition}), topology, this.consumer, config, this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 5L)));
        this.task.addRecords(repartition, Collections.singletonList(this.getConsumerRecord(repartition, 10L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.process(0L));
        this.task.prepareCommit();
        Map map = this.task.purgeableOffsets();
        MatcherAssert.assertThat((Object)map, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(repartition, 11L)));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1));
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new KafkaException("KABOOM!");
            }
        };
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)consumer);
        this.task.initializeIfNeeded();
        Assert.assertThrows(StreamsException.class, () -> ((StreamTask)this.task).completeRestoration());
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> ((StreamTask)this.task).prepareCommit());
    }

    @Test
    public void shouldThrowIfPostCommittingOnIllegalState() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "100"), "latest");
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> this.task.postCommit(true));
    }

    @Test
    public void shouldSkipCheckpointingSuspendedCreatedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Should not have tried to checkpoint")));
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.suspend();
        this.task.postCommit(true);
    }

    @Test
    public void shouldCheckpointForSuspendedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 1L));
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 1L)).andReturn(Collections.singletonMap(this.partition1, 2L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Checkpoint should not be called"))).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 12000L)).andReturn(Collections.singletonMap(this.partition1, 24000L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspendedRunningTask() {
        Map<TopicPartition, Long> checkpointableOffsets = Collections.singletonMap(this.partition1, 1L);
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        this.stateManager.updateChangelogOffsets((Map)EasyMock.eq(checkpointableOffsets));
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(checkpointableOffsets);
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(checkpointableOffsets).once();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "0"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singleton(this.getConsumerRecord(this.partition1, 10L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify((Object[])new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 50L)).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1)).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Collections.singletonMap(this.partition1, 50L), (Object)this.task.changelogOffsets());
        this.task.completeRestoration();
        Assert.assertEquals(Collections.singletonMap(this.partition1, -2L), (Object)this.task.changelogOffsets());
    }

    @Test
    public void shouldNotCheckpointOnCloseCreated() {
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Flush should not be called"))).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Checkpoint should not be called"))).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        this.task.closeClean();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source1.closed);
        EasyMock.verify((Object[])new Object[]{this.stateManager, this.recordCollector});
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
    }

    @Test
    public void shouldCheckpointOnCloseRestoringIfNoProgress() {
        this.stateManager.flush();
        EasyMock.expectLastCall().once();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointOffsetsOnPostCommit() {
        long offset = 543L;
        long consumedOffset = 345L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 12543L));
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateManager});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "0"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 345L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowExceptionOnCloseCleanError() {
        long offset = 543L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition)).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)new ProcessorStateException("KABOOM!")).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        Assert.assertThrows(ProcessorStateException.class, () -> this.task.closeClean());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andStubReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanFlushError() {
        long offset = 543L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L));
        this.stateManager.flushCache();
        EasyMock.expectLastCall().andThrow((Throwable)new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Flush should not be called"))).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Checkpoint should not be called"))).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Close should not be called!"))).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertThrows(ProcessorStateException.class, () -> ((StreamTask)this.task).prepareCommit());
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        long offset = 54300L;
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Close should not be called!"))).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 54300L));
        EasyMock.replay((Object[])new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 54300L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        Assert.assertThrows(ProcessorStateException.class, () -> this.task.postCommit(true));
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.closeClean();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.closeDirty();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.closeCleanAndRecycleState();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void closeShouldBeIdempotent() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(false, "100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        this.task.closeClean();
        this.task.closeClean();
        this.task.closeDirty();
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldUpdatePartitions() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        HashSet<TopicPartition> newPartitions = new HashSet<TopicPartition>(this.task.inputPartitions());
        newPartitions.add(new TopicPartition("newTopic", 0));
        this.task.update(newPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.source1.name(), Arrays.asList("topic1", "newTopic")), Utils.mkEntry((Object)this.source2.name(), Collections.singletonList("topic2"))}));
        MatcherAssert.assertThat((Object)this.task.inputPartitions(), (Matcher)CoreMatchers.equalTo(newPartitions));
    }

    @Test
    public void shouldThrowIfCleanClosingDirtyTask() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> this.task.closeClean());
    }

    @Test
    public void shouldThrowIfRecyclingDirtyTask() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> this.task.closeCleanAndRecycleState());
    }

    @Test
    public void shouldOnlyRecycleSuspendedTasks() {
        this.stateManager.recycle();
        this.recordCollector.closeClean();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeCleanAndRecycleState());
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeCleanAndRecycleState());
        this.task.completeRestoration();
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeCleanAndRecycleState());
        this.task.suspend();
        this.task.closeCleanAndRecycleState();
        EasyMock.verify((Object[])new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRestoringTasks() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RESTORING));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> this.task.suspend());
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, StreamTaskTest.createConfig(false, "100"), this.stateManager, this.streamsMetrics, null);
        StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[0]), Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        TopologyException exception = (TopologyException)Assert.assertThrows(TopologyException.class, () -> this.lambda$shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic$18(topology, metrics, (InternalProcessorContext)context));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Invalid topology: Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order."));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        Logger log = new LogContext().logger(StreamTaskTest.class);
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.maybeInitTaskTimeoutOrThrow(0L, null, log);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), null, log);
        Assert.assertThrows(TimeoutException.class, () -> this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null, log));
    }

    @Test
    public void shouldCLearTaskTimeout() {
        Logger log = new LogContext().logger(StreamTaskTest.class);
        this.task = this.createStatelessTask(StreamTaskTest.createConfig(false, "0"), "latest");
        this.task.maybeInitTaskTimeoutOrThrow(0L, null, log);
        this.task.clearTaskTimeout(log);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null, log);
    }

    private List<MetricName> getTaskMetrics() {
        return this.metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
    }

    private StreamTask createOptimizedStatefulTask(StreamsConfig config, Consumer<byte[], byte[]> consumer) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", true);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Collections.singletonList(this.source1), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1)}), Collections.singletonList(stateStore), Collections.singletonMap("store", "topic1"));
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, consumer, config, this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private StreamTask createDisconnectedTask(StreamsConfig config) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}), Collections.singletonList(stateStore), Collections.emptyMap());
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new TimeoutException("KABOOM!");
            }
        };
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, (Consumer)consumer, config, this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private StreamTask createFaultyStatefulTask(StreamsConfig config) {
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source3}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap());
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, config, this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private StreamTask createStatefulTask(StreamsConfig config, boolean logged) {
        return this.createStatefulTask(config, logged, this.stateManager);
    }

    private StreamTask createStatefulTask(StreamsConfig config, boolean logged, ProcessorStateManager stateManager) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", logged);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}), Collections.singletonList(stateStore), logged ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap());
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, config, this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private StreamTask createStatelessTask(StreamsConfig config, String builtInMetricsVersion) {
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, config, new StreamsMetricsImpl(this.metrics, "test", builtInMetricsVersion, (Time)this.time), this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private StreamTask createStatelessTaskWithForwardingTopology(SourceNode<Integer, Integer, Integer, Integer> sourceNode) {
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{sourceNode, this.processorStreamTime}), Collections.singletonMap("topic1", sourceNode));
        sourceNode.addChild(this.processorStreamTime);
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig config = StreamTaskTest.createConfig(false, "0");
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, Collections.singleton(this.partition1), topology, this.consumer, config, new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time), this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context);
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long offset) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, offset, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue);
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(Integer key, long offset) {
        return new ConsumerRecord("topic1", 1, offset, offset, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)new IntegerSerializer().serialize("topic1", key), (Object)this.recordValue);
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        Sensor sensor = this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
        sensor.add(metricName, (MeasurableStat)new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double expected, StreamsMetricsImpl streamsMetrics, MetricName metricName) {
        KafkaMetric metric = (KafkaMetric)streamsMetrics.metrics().get(metricName);
        double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
        MatcherAssert.assertThat((Object)totalCloses, (Matcher)CoreMatchers.equalTo((Object)expected));
    }

    private /* synthetic */ void lambda$shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic$18(ProcessorTopology topology, StreamsMetricsImpl metrics, InternalProcessorContext context) throws Throwable {
        new StreamTask(this.taskId, this.partitions, topology, this.consumer, StreamTaskTest.createConfig(false, "100"), metrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, context);
    }
}

