package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.IMocksControl;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTaskTest.class */
public class StreamTaskTest {
    private static final String APPLICATION_ID = "stream-task-test";
    private static final File BASE_DIR = TestUtils.tempDirectory();
    private static final long DEFAULT_TIMESTAMP = 1000;
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;

    @Mock(type = MockType.NICE)
    private ProcessorStateManager stateManager;

    @Mock(type = MockType.NICE)
    private RecordCollector recordCollector;

    @Mock(type = MockType.NICE)
    private ThreadCache cache;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition[]{this.partition1, this.partition2});
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.1
        @Override // org.apache.kafka.test.MockSourceNode
        public void process(Record<Integer, Integer> record) {
            throw new RuntimeException("KABOOM!");
        }

        @Override // org.apache.kafka.test.MockSourceNode
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode<>(10);
    private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode<>(10, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 1);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private MockTime time = new MockTime();
    private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), this.time);
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final Punctuator punctuator = new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.2
        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.streams.processor.internals.StreamTaskTest
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        public void punctuate(long r5) {
            /*
                r4 = this;
                r0 = r4
                org.apache.kafka.streams.processor.internals.StreamTaskTest r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.this
                r1 = r5
                long r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(r0, r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.AnonymousClass2.punctuate(long):void");
        }
    };

    public StreamTaskTest() {
    }

    private static ProcessorTopology withRepartitionTopics(List<ProcessorNode<?, ?, ?, ?>> list, Map<String, SourceNode<?, ?, ?, ?>> map, Set<String> set) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), set);
    }

    private static ProcessorTopology withSources(List<ProcessorNode<?, ?, ?, ?>> list, Map<String, SourceNode<?, ?, ?, ?>> map) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
    }

    private static StreamsConfig createConfig(boolean z, String str) {
        try {
            String canonicalPath = BASE_DIR.getCanonicalPath();
            Map.Entry[] entryArr = new Map.Entry[7];
            entryArr[0] = Utils.mkEntry("application.id", APPLICATION_ID);
            entryArr[1] = Utils.mkEntry("bootstrap.servers", "localhost:2171");
            entryArr[2] = Utils.mkEntry("buffered.records.per.partition", "3");
            entryArr[3] = Utils.mkEntry("state.dir", canonicalPath);
            entryArr[4] = Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName());
            entryArr[5] = Utils.mkEntry("processing.guarantee", z ? "exactly_once" : "at_least_once");
            entryArr[6] = Utils.mkEntry("max.task.idle.ms", str);
            return new StreamsConfig(Utils.mkProperties(Utils.mkMap(entryArr)));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Before
    public void setup() {
        EasyMock.expect(this.stateManager.taskId()).andStubReturn(this.taskId);
        EasyMock.expect(this.stateManager.taskType()).andStubReturn(Task.TaskType.ACTIVE);
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 0L), Utils.mkEntry(this.partition2, 0L)}));
        this.stateDirectory = new StateDirectory(createConfig(false, "100"), new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            try {
                this.task.suspend();
            } catch (IllegalStateException e) {
                if (!e.getMessage().startsWith("Illegal state CLOSED")) {
                    throw e;
                }
            } catch (RuntimeException e2) {
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete(BASE_DIR);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(false);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateDirectory, this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), false);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(LockException.class, streamTask::initializeIfNeeded);
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException {
        IMocksControl createStrictControl = EasyMock.createStrictControl();
        ProcessorStateManager processorStateManager = (ProcessorStateManager) createStrictControl.createMock(ProcessorStateManager.class);
        EasyMock.expect(processorStateManager.taskType()).andStubReturn(Task.TaskType.ACTIVE);
        this.stateDirectory = (StateDirectory) createStrictControl.createMock(StateDirectory.class);
        processorStateManager.registerGlobalStateStores(Collections.emptyList());
        EasyMock.expectLastCall();
        EasyMock.expect(processorStateManager.taskId()).andReturn(this.taskId);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        processorStateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(processorStateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
        this.stateDirectory.unlock(this.taskId);
        EasyMock.expectLastCall();
        createStrictControl.checkOrder(true);
        createStrictControl.replay();
        this.task = createStatefulTask(createConfig(true, "100"), true, processorStateManager);
        this.task.suspend();
        this.task.closeDirty();
        this.task = null;
        createStrictControl.verify();
    }

    @Test
    public void shouldReadCommittedStreamTimeOnInitialize() {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.consumer.commitSync((Map) this.partitions.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return new OffsetAndMetadata(0L, StreamTask.encodeTimestamp(10L));
        })));
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        Assert.assertEquals(-1L, this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertEquals(10L, this.task.streamTime());
    }

    @Test
    public void shouldTransitToRestoringThenRunningAfterCreation() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateDirectory, this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        Assert.assertEquals(Task.State.CREATED, this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        this.task.completeRestoration();
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        Assert.assertTrue(this.source1.initialized);
        Assert.assertTrue(this.source2.initialized);
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldProcessInOrder() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
    }

    @Test
    public void shouldRecordBufferedRecords() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        KafkaMetric metric = getMetric("active-buffer", "%s-count", this.task.id().toString(), "latest");
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L)));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(2.0d)));
        this.task.process(0L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(1.0d)));
    }

    @Test
    public void shouldRecordProcessRatio() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        KafkaMetric metric = getMetric("active-process", "%s-ratio", this.task.id().toString(), "latest");
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessBatchTime(15L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.25d)));
        this.task.recordProcessBatchTime(10L);
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.25d)));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessTimeRatioAndBufferSize(20L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(1.0d)));
    }

    @Test
    public void shouldRecordE2ELatencyOnSourceNodeAndTerminalNodes() {
        this.time = new MockTime(0L, 0L, 0L);
        this.metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), this.time);
        MockSourceNode<Integer, Integer, Integer, Integer> mockSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.3
            InternalProcessorContext context;

            @Override // org.apache.kafka.test.MockSourceNode
            public void init(InternalProcessorContext internalProcessorContext) {
                this.context = internalProcessorContext;
                super.init(internalProcessorContext);
            }

            @Override // org.apache.kafka.test.MockSourceNode
            public void process(Record<Integer, Integer> record) {
                if (((Integer) record.key()).intValue() % 2 == 0) {
                    this.context.forward(record);
                }
            }
        };
        this.task = createStatelessTaskWithForwardingTopology(mockSourceNode);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        String name = mockSourceNode.name();
        String name2 = this.processorStreamTime.name();
        Metric processorMetric = getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), name, "latest");
        Metric processorMetric2 = getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), name, "latest");
        Metric processorMetric3 = getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), name, "latest");
        Metric processorMetric4 = getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), name2, "latest");
        Metric processorMetric5 = getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), name2, "latest");
        Metric processorMetric6 = getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), name2, "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord((Integer) 0, 0L)));
        this.task.process(10L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord((Integer) 1, 0L)));
        this.task.process(15L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(12.5d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(15.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord((Integer) 2, 0L)));
        this.task.process(23L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.0d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.5d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord((Integer) 3, 0L)));
        this.task.process(5L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(13.25d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(5.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.5d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
    }

    @Test
    public void shouldConstructMetricsWithBuiltInMetricsVersion0100To24() {
        testMetrics("0.10.0-2.4");
    }

    @Test
    public void shouldConstructMetricsWithBuiltInMetricsVersionLatest() {
        testMetrics("latest");
    }

    private void testMetrics(String str) {
        this.task = createStatelessTask(createConfig(false, "100"), str);
        Assert.assertNotNull(getMetric("enforced-processing", "%s-rate", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("enforced-processing", "%s-total", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("record-lateness", "%s-avg", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("record-lateness", "%s-max", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("active-process", "%s-ratio", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("active-buffer", "%s-count", this.task.id().toString(), str));
        if ("0.10.0-2.4".equals(str)) {
            testMetricsForBuiltInMetricsVersion0100To24();
        } else {
            testMetricsForBuiltInMetricsVersionLatest();
        }
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        String str2 = "latest".equals(str) ? "thread-id" : "client-id";
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", str2, this.threadId, this.task.id())));
        if ("0.10.0-2.4".equals(str)) {
            Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", str2, this.threadId)));
        }
    }

    private void testMetricsForBuiltInMetricsVersionLatest() {
        Assert.assertNull(getMetric("commit", "%s-latency-avg", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-latency-max", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-rate", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-total", "all", "latest"));
        Assert.assertNotNull(getMetric("process", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("process", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-rate", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-total", this.task.id().toString(), "latest"));
    }

    private void testMetricsForBuiltInMetricsVersion0100To24() {
        Assert.assertNotNull(getMetric("commit", "%s-rate", "all", "0.10.0-2.4"));
        Assert.assertNull(getMetric("process", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("process", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-rate", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-total", this.task.id().toString(), "0.10.0-2.4"));
    }

    private KafkaMetric getMetric(String str, String str2, String str3, String str4) {
        Map metrics = this.metrics.metrics();
        Metrics metrics2 = this.metrics;
        String format = String.format(str2, str);
        Map.Entry[] entryArr = new Map.Entry[2];
        entryArr[0] = Utils.mkEntry("task-id", str3);
        entryArr[1] = Utils.mkEntry("latest".equals(str4) ? "thread-id" : "client-id", Thread.currentThread().getName());
        return (KafkaMetric) metrics.get(metrics2.metricName(format, "stream-task-metrics", "", Utils.mkMap(entryArr)));
    }

    private Metric getProcessorMetric(String str, String str2, String str3, String str4, String str5) {
        Map metrics = this.metrics.metrics();
        String format = String.format(str2, str);
        Map.Entry[] entryArr = new Map.Entry[3];
        entryArr[0] = Utils.mkEntry("task-id", str3);
        entryArr[1] = Utils.mkEntry("processor-node-id", str4);
        entryArr[2] = Utils.mkEntry("latest".equals(str5) ? "thread-id" : "client-id", Thread.currentThread().getName());
        return StreamsTestUtils.getMetricByNameFilterByTags(metrics, format, "stream-processor-node-metrics", Utils.mkMap(entryArr));
    }

    @Test
    public void shouldPauseAndResumeBasedOnBufferedRecords() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L), getConsumerRecord(this.partition2, 55L), getConsumerRecord(this.partition2, 65L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 30L), getConsumerRecord(this.partition1, 40L), getConsumerRecord(this.partition1, 50L)));
        Assert.assertEquals(2L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition1));
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(0L, this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 142L), getConsumerRecord(this.partition1, 155L), getConsumerRecord(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 145L), getConsumerRecord(this.partition2, 159L), getConsumerRecord(this.partition2, 161L)));
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(7L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(6L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(4L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20, 142, 155, 160);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 30L), getConsumerRecord(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L)));
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        this.processorStreamTime.mockProcessor.scheduleCancellable().cancel();
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable().cancel();
        this.time.sleep(10L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse(this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
    }

    @Test
    public void shouldCommitNextOffsetFromQueueIfAvailable() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 0L), getConsumerRecord(this.partition1, 3L), getConsumerRecord(this.partition1, 5L)));
        this.task.process(0L);
        this.task.process(0L);
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(5L, StreamTask.encodeTimestamp(3L)))})));
    }

    @Test
    public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.consumer.addRecord(getConsumerRecord(this.partition1, 0L));
        this.consumer.addRecord(getConsumerRecord(this.partition1, 1L));
        this.consumer.addRecord(getConsumerRecord(this.partition1, 2L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(3L, StreamTask.encodeTimestamp(0L)))})));
    }

    @Test
    public void shouldFailOnCommitIfTaskIsClosed() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.suspend();
        this.task.transitionTo(Task.State.CLOSED);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, streamTask::prepareCommit)).getMessage(), Matchers.is("Illegal state CLOSED while preparing active task 0_0 for committing"));
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.requestCommit();
        Assert.assertTrue(this.task.commitRequested());
    }

    @Test
    public void shouldEncodeAndDecodeMetadata() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.decodeTimestamp(StreamTask.encodeTimestamp(DEFAULT_TIMESTAMP)));
    }

    @Test
    public void shouldReturnUnknownTimestampIfUnknownVersion() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        Assert.assertEquals(-1L, this.task.decodeTimestamp(Base64.getEncoder().encodeToString(new byte[]{2})));
    }

    @Test
    public void shouldReturnUnknownTimestampIfEmptyMessage() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        Assert.assertEquals(-1L, this.task.decodeTimestamp(""));
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse(this.task.process(0L));
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.process(0L));
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, array, array)));
        Assert.assertTrue(this.task.process(0L));
    }

    @Test
    public void shouldBeProcessableIfWaitedForTooLong() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MetricName metricName = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.taskId.toString())}));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Arrays.asList(new ConsumerRecord("topic1", 1, 0L, array, array), new ConsumerRecord("topic1", 1, 1L, array, array), new ConsumerRecord("topic1", 1, 2L, array, array)));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 99));
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 100));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 101));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, array, array)));
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 130));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 150));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 249));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 250));
        Assert.assertEquals(Double.valueOf(3.0d), this.metrics.metric(metricName).metricValue());
    }

    @Test
    public void shouldNotBeProcessableIfNoDataAvailble() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MetricName metricName = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.taskId.toString())}));
        Assert.assertFalse(this.task.process(0L));
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 99));
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 100));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 110));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 150));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.process(this.time.milliseconds() + 249));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.process(this.time.milliseconds() + 250));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10, milliseconds + 20, milliseconds + 30, milliseconds + 50);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 100, milliseconds + 110, milliseconds + 122, milliseconds + 130, milliseconds + 235, milliseconds + 240);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldNotShareHeadersBetweenPunctuateIterations() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j -> {
            this.task.processorContext().headers().add("dummy", (byte[]) null);
        });
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j2 -> {
            Assert.assertFalse(this.task.processorContext().headers().iterator().hasNext());
        });
    }

    @Test
    public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createFaultyStatefulTask(createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 5L), getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L)));
        Assert.assertThrows(StreamsException.class, () -> {
            this.task.process(0L);
        });
    }

    @Test
    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = createDisconnectedTask(createConfig(false, "100"));
        this.task.initializeIfNeeded();
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(TimeoutException.class, streamTask::completeRestoration);
    }

    @Test
    public void shouldReInitializeTopologyWhenResuming() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.recordCollector.offsets()).andThrow(new AssertionError("Should not try to read offsets")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.resume();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.completeRestoration();
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        Assert.assertTrue(this.source1.initialized);
        Assert.assertTrue(this.source2.initialized);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
        EasyMock.reset(new Object[]{this.recordCollector});
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        EasyMock.replay(new Object[]{this.recordCollector});
    }

    @Test
    public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 20L));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 0L)).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 12000L));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.recordCollector});
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback);
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(true, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertFalse(new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint").exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail("Should throw illegal state exception as current node is not null");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
        });
    }

    @Test
    public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createFaultyStatefulTask(createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.suspend();
        });
        this.task.closeDirty();
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition topicPartition = new TopicPartition("repartition", 1);
        ProcessorTopology withRepartitionTopics = withRepartitionTopics(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry(topicPartition.topic(), this.source2)}), Collections.singleton(topicPartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, topicPartition));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, 0L)}));
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig createConfig = createConfig(false, "0");
        this.task = new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1, topicPartition}), withRepartitionTopics, this.consumer, createConfig, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, createConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 5L)));
        this.task.addRecords(topicPartition, Collections.singletonList(getConsumerRecord(topicPartition, 10L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.process(0L));
        this.task.prepareCommit();
        MatcherAssert.assertThat(this.task.purgeableOffsets(), CoreMatchers.equalTo(Collections.singletonMap(topicPartition, 11L)));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.4
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw new KafkaException("KABOOM!");
            }
        });
        this.task.initializeIfNeeded();
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(StreamsException.class, streamTask::completeRestoration);
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(IllegalStateException.class, streamTask::prepareCommit);
    }

    @Test
    public void shouldThrowIfPostCommittingOnIllegalState() {
        this.task = createStatelessTask(createConfig(false, "100"), "latest");
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.postCommit(true);
        });
    }

    @Test
    public void shouldSkipCheckpointingSuspendedCreatedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.suspend();
        this.task.postCommit(true);
    }

    @Test
    public void shouldCheckpointForSuspendedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 1L));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 1L)).andReturn(Collections.singletonMap(this.partition1, 2L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 12000L)).andReturn(Collections.singletonMap(this.partition1, 24000L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspendedRunningTask() {
        Map singletonMap = Collections.singletonMap(this.partition1, 1L);
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        this.stateManager.updateChangelogOffsets((Map) EasyMock.eq(singletonMap));
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(singletonMap);
        EasyMock.expect(this.recordCollector.offsets()).andReturn(singletonMap).once();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(false, "0"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singleton(getConsumerRecord(this.partition1, 10L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 50L)).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1)).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Collections.singletonMap(this.partition1, 50L), this.task.changelogOffsets());
        this.task.completeRestoration();
        Assert.assertEquals(Collections.singletonMap(this.partition1, -2L), this.task.changelogOffsets());
    }

    @Test
    public void shouldNotCheckpointOnCloseCreated() {
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.suspend();
        this.task.closeClean();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source1.closed);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
    }

    @Test
    public void shouldCheckpointOnCloseRestoringIfNoProgress() {
        this.stateManager.flush();
        EasyMock.expectLastCall().once();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointOffsetsOnPostCommit() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 12543L));
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig(false, "0"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 345L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowExceptionOnCloseCleanError() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition)).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        Assert.assertThrows(ProcessorStateException.class, () -> {
            this.task.closeClean();
        });
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andStubReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanFlushError() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L));
        this.stateManager.flushCache();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 543L)));
        this.task.process(100L);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(ProcessorStateException.class, streamTask::prepareCommit);
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 54300L));
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 54300L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        Assert.assertThrows(ProcessorStateException.class, () -> {
            this.task.postCommit(true);
        });
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.closeClean();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.closeDirty();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.closeCleanAndRecycleState();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void closeShouldBeIdempotent() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig(false, "100"), this.consumer);
        this.task.suspend();
        this.task.closeClean();
        this.task.closeClean();
        this.task.closeDirty();
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldUpdatePartitions() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        HashSet hashSet = new HashSet(this.task.inputPartitions());
        hashSet.add(new TopicPartition("newTopic", 0));
        this.task.update(hashSet, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.source1.name(), Arrays.asList("topic1", "newTopic")), Utils.mkEntry(this.source2.name(), Collections.singletonList("topic2"))}));
        MatcherAssert.assertThat(this.task.inputPartitions(), CoreMatchers.equalTo(hashSet));
    }

    @Test
    public void shouldThrowIfCleanClosingDirtyTask() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        Assert.assertTrue(this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.task.closeClean();
        });
    }

    @Test
    public void shouldThrowIfRecyclingDirtyTask() {
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process(0L);
        Assert.assertTrue(this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
    }

    @Test
    public void shouldOnlyRecycleSuspendedTasks() {
        this.stateManager.recycle();
        this.recordCollector.closeClean();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
        this.task.completeRestoration();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
        this.task.suspend();
        this.task.closeCleanAndRecycleState();
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRestoringTasks() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig(false, "100"), true);
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.RESTORING));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createFaultyStatefulTask(createConfig(false, "100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.suspend();
        });
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
        ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(this.taskId, createConfig(false, "100"), this.stateManager, this.streamsMetrics, (ThreadCache) null);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test", "latest", this.time);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.replay(new Object[]{this.stateManager});
        ProcessorTopology withSources = withSources(Arrays.asList(new ProcessorNode[0]), Utils.mkMap(new Map.Entry[0]));
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            new StreamTask(this.taskId, this.partitions, withSources, this.consumer, createConfig(false, "100"), streamsMetricsImpl, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, processorContextImpl);
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order."));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        Logger logger = new LogContext().logger(StreamTaskTest.class);
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.maybeInitTaskTimeoutOrThrow(0L, (TimeoutException) null, logger);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), (TimeoutException) null, logger);
        Assert.assertThrows(TimeoutException.class, () -> {
            this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (TimeoutException) null, logger);
        });
    }

    @Test
    public void shouldCLearTaskTimeout() {
        Logger logger = new LogContext().logger(StreamTaskTest.class);
        this.task = createStatelessTask(createConfig(false, "0"), "latest");
        this.task.maybeInitTaskTimeoutOrThrow(0L, (TimeoutException) null, logger);
        this.task.clearTaskTimeout(logger);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (TimeoutException) null, logger);
    }

    private List<MetricName> getTaskMetrics() {
        return (List) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.tags().containsKey("task-id");
        }).collect(Collectors.toList());
    }

    private StreamTask createOptimizedStatefulTask(StreamsConfig streamsConfig, Consumer<byte[], byte[]> consumer) {
        return new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1}), ProcessorTopologyFactories.with(Collections.singletonList(this.source1), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1)}), Collections.singletonList(new MockKeyValueStore("store", true)), Collections.singletonMap("store", "topic1")), consumer, streamsConfig, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
    }

    private StreamTask createDisconnectedTask(StreamsConfig streamsConfig) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}), Collections.singletonList(new MockKeyValueStore("store", false)), Collections.emptyMap()), new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.5
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw new TimeoutException("KABOOM!");
            }
        }, streamsConfig, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
    }

    private StreamTask createFaultyStatefulTask(StreamsConfig streamsConfig) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source3), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap()), this.consumer, streamsConfig, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
    }

    private StreamTask createStatefulTask(StreamsConfig streamsConfig, boolean z) {
        return createStatefulTask(streamsConfig, z, this.stateManager);
    }

    private StreamTask createStatefulTask(StreamsConfig streamsConfig, boolean z, ProcessorStateManager processorStateManager) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}), Collections.singletonList(new MockKeyValueStore("store", z)), z ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap()), this.consumer, streamsConfig, this.streamsMetrics, this.stateDirectory, this.cache, this.time, processorStateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, processorStateManager, this.streamsMetrics, (ThreadCache) null));
    }

    private StreamTask createStatelessTask(StreamsConfig streamsConfig, String str) {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        return new StreamTask(this.taskId, this.partitions, withSources, this.consumer, streamsConfig, new StreamsMetricsImpl(this.metrics, "test", str, this.time), this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StreamTask createStatelessTaskWithForwardingTopology(SourceNode<Integer, Integer, Integer, Integer> sourceNode) {
        ProcessorTopology withSources = withSources(Arrays.asList(sourceNode, this.processorStreamTime), Collections.singletonMap("topic1", sourceNode));
        sourceNode.addChild(this.processorStreamTime);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig createConfig = createConfig(false, "0");
        return new StreamTask(this.taskId, Collections.singleton(this.partition1), withSources, this.consumer, createConfig, new StreamsMetricsImpl(this.metrics, "test", "latest", this.time), this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, createConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null));
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long j) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, j, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue);
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(Integer num, long j) {
        return new ConsumerRecord<>("topic1", 1, j, j, TimestampType.CREATE_TIME, 0L, 0, 0, new IntegerSerializer().serialize("topic1", num), this.recordValue);
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]).add(metricName, new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double d, StreamsMetricsImpl streamsMetricsImpl, MetricName metricName) {
        KafkaMetric kafkaMetric = (KafkaMetric) streamsMetricsImpl.metrics().get(metricName);
        MatcherAssert.assertThat(Double.valueOf(kafkaMetric.measurable().measure(kafkaMetric.config(), System.currentTimeMillis())), CoreMatchers.equalTo(Double.valueOf(d)));
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.punctuatedAt = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long");
    }

    static {
    }
}
