package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.IMocksControl;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.MockitoSession;
import org.mockito.quality.Strictness;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTaskTest.class */
public class StreamTaskTest {
    private static final String APPLICATION_ID = "stream-task-test";
    private static final File BASE_DIR = TestUtils.tempDirectory();
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;

    @Mock(type = MockType.NICE)
    private ProcessorStateManager stateManager;

    @Mock(type = MockType.NICE)
    private RecordCollector recordCollector;

    @Mock(type = MockType.NICE)
    private ThreadCache cache;
    private MockitoSession mockito;
    private final LogContext logContext = new LogContext("[test] ");
    private final String topic1 = AssignmentTestUtils.TP_1_NAME;
    private final String topic2 = AssignmentTestUtils.TP_2_NAME;
    private final TopicPartition partition1 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 1);
    private final TopicPartition partition2 = new TopicPartition(AssignmentTestUtils.TP_2_NAME, 1);
    private final Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition[]{this.partition1, this.partition2});
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.1
        @Override // org.apache.kafka.test.MockSourceNode
        public void process(Record<Integer, Integer> record) {
            throw new RuntimeException("KABOOM!");
        }

        @Override // org.apache.kafka.test.MockSourceNode
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.2
        @Override // org.apache.kafka.test.MockSourceNode
        public void process(Record<Integer, Integer> record) {
            throw new TimeoutException("Kaboom!");
        }
    };
    private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode<>(10);
    private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode<>(10, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 1);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private MockTime time = new MockTime();
    private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), this.time);
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final Punctuator punctuator = new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.3
        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.streams.processor.internals.StreamTaskTest
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        public void punctuate(long r5) {
            /*
                r4 = this;
                r0 = r4
                org.apache.kafka.streams.processor.internals.StreamTaskTest r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.this
                r1 = r5
                long r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(r0, r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.AnonymousClass3.punctuate(long):void");
        }
    };

    public StreamTaskTest() {
    }

    private static ProcessorTopology withRepartitionTopics(List<ProcessorNode<?, ?, ?, ?>> list, Map<String, SourceNode<?, ?>> map, Set<String> set) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), set);
    }

    private static ProcessorTopology withSources(List<ProcessorNode<?, ?, ?, ?>> list, Map<String, SourceNode<?, ?>> map) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
    }

    private static StreamsConfig createConfig() {
        return createConfig("0");
    }

    private static StreamsConfig createConfig(String str) {
        return createConfig("at_least_once", str);
    }

    private static StreamsConfig createConfig(String str, String str2) {
        return createConfig(str, str2, LogAndFailExceptionHandler.class.getName());
    }

    private static StreamsConfig createConfig(String str, String str2, String str3) {
        try {
            return new StreamsConfig(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", APPLICATION_ID), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("buffered.records.per.partition", "3"), Utils.mkEntry("state.dir", BASE_DIR.getCanonicalPath()), Utils.mkEntry("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name), Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName()), Utils.mkEntry("processing.guarantee", str), Utils.mkEntry("max.task.idle.ms", str2), Utils.mkEntry("default.deserialization.exception.handler", str3)})));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Before
    public void setup() {
        this.mockito = Mockito.mockitoSession().initMocks(this).strictness(Strictness.STRICT_STUBS).startMocking();
        EasyMock.expect(this.stateManager.taskId()).andStubReturn(this.taskId);
        EasyMock.expect(this.stateManager.taskType()).andStubReturn(Task.TaskType.ACTIVE);
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 0L), Utils.mkEntry(this.partition2, 0L)}));
        this.stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), true, false);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            try {
                this.task.suspend();
            } catch (IllegalStateException e) {
                if (!e.getMessage().startsWith("Illegal state CLOSED")) {
                    throw e;
                }
            } catch (RuntimeException e2) {
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete(BASE_DIR);
        this.mockito.finishMocking();
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(false);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback, (CommitCallback) null);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateDirectory, this.stateManager});
        this.task = createStatefulTask(createConfig("100"), false);
        Assert.assertThrows(LockException.class, () -> {
            this.task.initializeIfNeeded();
        });
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException {
        IMocksControl createStrictControl = EasyMock.createStrictControl();
        ProcessorStateManager processorStateManager = (ProcessorStateManager) createStrictControl.createMock(ProcessorStateManager.class);
        EasyMock.expect(processorStateManager.taskType()).andStubReturn(Task.TaskType.ACTIVE);
        this.stateDirectory = (StateDirectory) createStrictControl.createMock(StateDirectory.class);
        processorStateManager.registerGlobalStateStores(Collections.emptyList());
        EasyMock.expectLastCall();
        EasyMock.expect(processorStateManager.taskId()).andReturn(this.taskId);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        processorStateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(processorStateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
        this.stateDirectory.unlock(this.taskId);
        EasyMock.expectLastCall();
        createStrictControl.checkOrder(true);
        createStrictControl.replay();
        this.task = createStatefulTask(createConfig("exactly_once_v2", "100"), true, processorStateManager);
        this.task.suspend();
        this.task.closeDirty();
        this.task = null;
        createStrictControl.verify();
    }

    @Test
    public void shouldResetOffsetsToLastCommittedForSpecifiedPartitions() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.addPartitionsForOffsetReset(Collections.singleton(this.partition1));
        this.consumer.seek(this.partition1, 5L);
        this.consumer.commitSync();
        this.consumer.seek(this.partition1, 10L);
        this.consumer.seek(this.partition2, 15L);
        Consumer consumer = (Consumer) EasyMock.mock(Consumer.class);
        consumer.accept(Collections.emptySet());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{consumer});
        this.task.initializeIfNeeded();
        this.task.completeRestoration(consumer);
        MatcherAssert.assertThat(Long.valueOf(this.consumer.position(this.partition1)), CoreMatchers.equalTo(5L));
        MatcherAssert.assertThat(Long.valueOf(this.consumer.position(this.partition2)), CoreMatchers.equalTo(15L));
    }

    @Test
    public void shouldAutoOffsetResetIfNoCommittedOffsetFound() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.addPartitionsForOffsetReset(Collections.singleton(this.partition1));
        final AtomicReference atomicReference = new AtomicReference();
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.4
            public void seek(TopicPartition topicPartition, long j) {
                AssertionError assertionError = (AssertionError) atomicReference.get();
                if (assertionError != null) {
                    throw assertionError;
                }
                super.seek(topicPartition, j);
            }
        };
        mockConsumer.assign(Arrays.asList(this.partition1, this.partition2));
        mockConsumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 0L), Utils.mkEntry(this.partition2, 0L)}));
        mockConsumer.seek(this.partition1, 5L);
        mockConsumer.seek(this.partition2, 15L);
        atomicReference.set(new AssertionError("Should not seek"));
        Consumer consumer = (Consumer) EasyMock.mock(Consumer.class);
        consumer.accept(Collections.singleton(this.partition1));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{consumer});
        this.task.initializeIfNeeded();
        this.task.completeRestoration(consumer);
        MatcherAssert.assertThat(Long.valueOf(mockConsumer.position(this.partition1)), CoreMatchers.equalTo(5L));
        MatcherAssert.assertThat(Long.valueOf(mockConsumer.position(this.partition2)), CoreMatchers.equalTo(15L));
        EasyMock.verify(new Object[]{consumer});
    }

    @Test
    public void shouldReadCommittedStreamTimeAndProcessorMetadataOnInitialize() {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        ProcessorMetadata processorMetadata = new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 1L), Utils.mkEntry("key2", 2L)}));
        this.consumer.commitSync((Map) this.partitions.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return new OffsetAndMetadata(0L, new TopicPartitionMetadata(10L, processorMetadata).encode());
        })));
        this.task = createStatelessTask(createConfig("100"));
        Assert.assertEquals(-1L, this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        Assert.assertEquals(10L, this.task.streamTime());
        Assert.assertEquals(1L, this.task.processorContext().processorMetadataForKey("key1").longValue());
        Assert.assertEquals(2L, this.task.processorContext().processorMetadataForKey("key2").longValue());
    }

    @Test
    public void shouldReadCommittedStreamTimeAndMergeProcessorMetadataOnInitialize() {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(0L, new TopicPartitionMetadata(10L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 1L), Utils.mkEntry("key2", 2L)}))).encode()))});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition2, new OffsetAndMetadata(0L, new TopicPartitionMetadata(20L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 10L), Utils.mkEntry("key3", 30L)}))).encode()))});
        this.consumer.commitSync(mkMap);
        this.consumer.commitSync(mkMap2);
        this.task = createStatelessTask(createConfig("100"));
        Assert.assertEquals(-1L, this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        Assert.assertEquals(20L, this.task.streamTime());
        Assert.assertEquals(10L, this.task.processorContext().processorMetadataForKey("key1").longValue());
        Assert.assertEquals(2L, this.task.processorContext().processorMetadataForKey("key2").longValue());
        Assert.assertEquals(30L, this.task.processorContext().processorMetadataForKey("key3").longValue());
    }

    @Test
    public void shouldTransitToRestoringThenRunningAfterCreation() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback, (CommitCallback) null);
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateDirectory, this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        Assert.assertEquals(Task.State.CREATED, this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        this.task.completeRestoration(set -> {
        });
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        Assert.assertTrue(this.source1.initialized);
        Assert.assertTrue(this.source2.initialized);
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldProcessInOrder() {
        this.task = createStatelessTask(createConfig());
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L, 101), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L, 102), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L, 103)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L, 201), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L, 202), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L, 203)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(Collections.singletonList(101), this.source1.values);
        Assert.assertEquals(Collections.emptyList(), this.source2.values);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102), this.source1.values);
        Assert.assertEquals(Collections.emptyList(), this.source2.values);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102), this.source1.values);
        Assert.assertEquals(Collections.singletonList(201), this.source2.values);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Collections.singletonList(201), this.source2.values);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Arrays.asList(201, 202), this.source2.values);
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Arrays.asList(201, 202, 203), this.source2.values);
    }

    @Test
    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
        this.task = createSingleSourceStateless(createConfig(), "latest");
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
        this.task = createSingleSourceStateless(createConfig("exactly_once", "0"), "latest");
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() {
        this.task = createSingleSourceStateless(createConfig("exactly_once_v2", "0"), "latest");
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        Assert.assertTrue(this.task.process(this.time.milliseconds()));
        Assert.assertFalse(this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldRecordBufferedRecords() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        KafkaMetric metric = getMetric("active-buffer", "%s-count", this.task.id().toString());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L)));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(2.0d)));
        Assert.assertTrue(this.task.process(0L));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(1.0d)));
    }

    @Test
    public void shouldRecordProcessRatio() {
        this.task = createStatelessTask(createConfig());
        KafkaMetric metric = getMetric("active-process", "%s-ratio", this.task.id().toString());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessBatchTime(15L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.25d)));
        this.task.recordProcessBatchTime(10L);
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.25d)));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessTimeRatioAndBufferSize(20L, this.time.milliseconds());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(1.0d)));
    }

    @Test
    public void shouldRecordE2ELatencyOnSourceNodeAndTerminalNodes() {
        this.time = new MockTime(0L, 0L, 0L);
        this.metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), this.time);
        MockSourceNode<Integer, Integer> mockSourceNode = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.5
            InternalProcessorContext<Integer, Integer> context;

            @Override // org.apache.kafka.test.MockSourceNode
            public void init(InternalProcessorContext<Integer, Integer> internalProcessorContext) {
                this.context = internalProcessorContext;
                super.init(internalProcessorContext);
            }

            @Override // org.apache.kafka.test.MockSourceNode
            public void process(Record<Integer, Integer> record) {
                if (((Integer) record.key()).intValue() % 2 == 0) {
                    this.context.forward(record);
                }
            }
        };
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
        this.task = createStatelessTaskWithForwardingTopology(mockSourceNode);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        String name = mockSourceNode.name();
        String name2 = this.processorStreamTime.name();
        Metric processorMetric = getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), name, "latest");
        Metric processorMetric2 = getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), name, "latest");
        Metric processorMetric3 = getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), name, "latest");
        Metric processorMetric4 = getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), name2, "latest");
        Metric processorMetric5 = getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), name2, "latest");
        Metric processorMetric6 = getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), name2, "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 0, 0L)));
        this.task.process(10L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 1, 0L)));
        this.task.process(15L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(12.5d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(15.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 2, 0L)));
        this.task.process(23L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.0d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.5d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 3, 0L)));
        this.task.process(5L);
        MatcherAssert.assertThat(processorMetric.metricValue(), CoreMatchers.equalTo(Double.valueOf(13.25d)));
        MatcherAssert.assertThat(processorMetric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(5.0d)));
        MatcherAssert.assertThat(processorMetric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
        MatcherAssert.assertThat(processorMetric4.metricValue(), CoreMatchers.equalTo(Double.valueOf(16.5d)));
        MatcherAssert.assertThat(processorMetric5.metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(processorMetric6.metricValue(), CoreMatchers.equalTo(Double.valueOf(23.0d)));
    }

    @Test
    public void shouldRecordRestoredRecords() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        KafkaMetric metric = getMetric("restore", "%s-total", this.task.id().toString());
        KafkaMetric metric2 = getMetric("restore", "%s-rate", this.task.id().toString());
        KafkaMetric metric3 = getMetric("restore", "%s-remaining-records-total", this.task.id().toString());
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(metric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(metric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(0.0d)));
        this.task.recordRestoration(this.time, 100L, true);
        MatcherAssert.assertThat(metric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(100.0d)));
        this.task.recordRestoration(this.time, 25L, false);
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(25.0d)));
        MatcherAssert.assertThat(metric2.metricValue(), Matchers.not(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(metric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(75.0d)));
        this.task.recordRestoration(this.time, 50L, false);
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(75.0d)));
        MatcherAssert.assertThat(metric2.metricValue(), Matchers.not(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(metric3.metricValue(), CoreMatchers.equalTo(Double.valueOf(25.0d)));
    }

    @Test
    public void shouldThrowOnTimeoutExceptionAndBufferRecordForRetryIfEosDisabled() {
        createTimeoutTask("at_least_once");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 0, 0L)));
        MatcherAssert.assertThat(Assert.assertThrows(TimeoutException.class, () -> {
            this.task.process(0L);
        }).getMessage(), CoreMatchers.equalTo("Kaboom!"));
        MatcherAssert.assertThat(Boolean.valueOf(this.task.commitNeeded()), CoreMatchers.equalTo(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.task.hasRecordsQueued()), CoreMatchers.equalTo(false));
        MatcherAssert.assertThat(Assert.assertThrows(TimeoutException.class, () -> {
            this.task.process(0L);
        }).getMessage(), CoreMatchers.equalTo("Kaboom!"));
    }

    @Test
    public void shouldThrowTaskCorruptedExceptionOnTimeoutExceptionIfEosEnabled() {
        createTimeoutTask("exactly_once_v2");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp((Integer) 0, 0L)));
        Assert.assertThrows(TaskCorruptedException.class, () -> {
            this.task.process(0L);
        });
    }

    @Test
    public void testMetrics() {
        this.task = createStatelessTask(createConfig("100"));
        Assert.assertNotNull(getMetric("enforced-processing", "%s-rate", this.task.id().toString()));
        Assert.assertNotNull(getMetric("enforced-processing", "%s-total", this.task.id().toString()));
        Assert.assertNotNull(getMetric("record-lateness", "%s-avg", this.task.id().toString()));
        Assert.assertNotNull(getMetric("record-lateness", "%s-max", this.task.id().toString()));
        Assert.assertNotNull(getMetric("active-process", "%s-ratio", this.task.id().toString()));
        Assert.assertNotNull(getMetric("active-buffer", "%s-count", this.task.id().toString()));
        testMetricsForBuiltInMetricsVersionLatest();
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", "thread-id", this.threadId, this.task.id())));
    }

    private void testMetricsForBuiltInMetricsVersionLatest() {
        Assert.assertNull(getMetric("commit", "%s-latency-avg", "all"));
        Assert.assertNull(getMetric("commit", "%s-latency-max", "all"));
        Assert.assertNull(getMetric("commit", "%s-rate", "all"));
        Assert.assertNull(getMetric("commit", "%s-total", "all"));
        Assert.assertNotNull(getMetric("process", "%s-latency-max", this.task.id().toString()));
        Assert.assertNotNull(getMetric("process", "%s-latency-avg", this.task.id().toString()));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-avg", this.task.id().toString()));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-max", this.task.id().toString()));
        Assert.assertNotNull(getMetric("punctuate", "%s-rate", this.task.id().toString()));
        Assert.assertNotNull(getMetric("punctuate", "%s-total", this.task.id().toString()));
    }

    private KafkaMetric getMetric(String str, String str2, String str3) {
        return (KafkaMetric) this.metrics.metrics().get(this.metrics.metricName(String.format(str2, str), "stream-task-metrics", "", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("task-id", str3), Utils.mkEntry("thread-id", Thread.currentThread().getName())})));
    }

    private Metric getProcessorMetric(String str, String str2, String str3, String str4, String str5) {
        return StreamsTestUtils.getMetricByNameFilterByTags(this.metrics.metrics(), String.format(str2, str), "stream-processor-node-metrics", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("task-id", str3), Utils.mkEntry("processor-node-id", str4), Utils.mkEntry("thread-id", Thread.currentThread().getName())}));
    }

    @Test
    public void shouldPauseAndResumeBasedOnBufferedRecords() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 55L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 65L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 40L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 50L)));
        Assert.assertEquals(2L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition1));
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(0L, this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = createStatelessTask(createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 142L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 155L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 145L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 159L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 161L)));
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(7L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(6L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(4L, this.source2.numReceived);
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20, 142, 155, 160);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L)));
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process(0L));
        this.processorStreamTime.mockProcessor.scheduleCancellable().cancel();
        Assert.assertFalse(this.task.canPunctuateStreamTime());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable().cancel();
        this.time.sleep(10L);
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        Assert.assertFalse(this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
        Assert.assertTrue(this.task.canPunctuateStreamTime());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue(this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse(this.task.commitNeeded());
    }

    @Test
    public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 3L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 5L)));
        this.task.process(0L);
        this.processorStreamTime.mockProcessor.addProcessorMetadata("key1", 100L);
        this.task.process(0L);
        this.processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L);
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(5L, new TopicPartitionMetadata(3L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 100L), Utils.mkEntry("key2", 200L)}))).encode()))})));
    }

    @Test
    public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
        this.task = createStatelessTask(createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 2L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition2, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L)));
        this.task.process(0L);
        TopicPartitionMetadata topicPartitionMetadata = new TopicPartitionMetadata(0L, new ProcessorMetadata());
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(3L, topicPartitionMetadata.encode()))})));
        this.task.postCommit(false);
        Assert.assertTrue(this.task.commitNeeded());
        this.consumer.poll(Duration.ZERO);
        this.task.process(0L);
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(3L, topicPartitionMetadata.encode())), Utils.mkEntry(this.partition2, new OffsetAndMetadata(1L, topicPartitionMetadata.encode()))})));
        this.task.postCommit(false);
        Assert.assertFalse(this.task.commitNeeded());
    }

    @Test
    public void shouldCommitOldProcessorMetadataWhenNotDirty() {
        this.task = createStatelessTask(createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L));
        this.consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 1L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L)));
        this.task.process(0L);
        this.processorStreamTime.mockProcessor.addProcessorMetadata("key1", 100L);
        TopicPartitionMetadata topicPartitionMetadata = new TopicPartitionMetadata(0L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 100L)})));
        TopicPartitionMetadata topicPartitionMetadata2 = new TopicPartitionMetadata(-1L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 100L)})));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(1L, topicPartitionMetadata.encode())), Utils.mkEntry(this.partition2, new OffsetAndMetadata(2L, topicPartitionMetadata2.encode()))})));
        this.task.postCommit(false);
        Assert.assertTrue(this.task.commitNeeded());
        this.consumer.poll(Duration.ZERO);
        this.task.process(0L);
        TopicPartitionMetadata topicPartitionMetadata3 = new TopicPartitionMetadata(1L, new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 100L)})));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(2L, topicPartitionMetadata3.encode()))})));
        this.task.postCommit(false);
        Assert.assertFalse(this.task.commitNeeded());
    }

    @Test
    public void shouldFailOnCommitIfTaskIsClosed() {
        this.task = createStatelessTask(createConfig());
        this.task.suspend();
        this.task.transitionTo(Task.State.CLOSED);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, streamTask::prepareCommit)).getMessage(), Matchers.is("Illegal state CLOSED while preparing active task 0_0 for committing"));
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.requestCommit();
        Assert.assertTrue(this.task.commitRequested());
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        MatcherAssert.assertThat("task is not idling", !this.task.timeCurrentIdlingStarted().isPresent());
        Assert.assertFalse(this.task.process(0L));
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 0L, array, array)));
        Assert.assertFalse(this.task.process(0L));
        MatcherAssert.assertThat("task is idling", this.task.timeCurrentIdlingStarted().isPresent());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord(AssignmentTestUtils.TP_2_NAME, 1, 0L, array, array)));
        Assert.assertTrue(this.task.process(0L));
        MatcherAssert.assertThat("task is not idling", !this.task.timeCurrentIdlingStarted().isPresent());
    }

    @Test
    public void shouldBeRecordIdlingTimeIfSuspended() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.suspend();
        MatcherAssert.assertThat("task is idling", this.task.timeCurrentIdlingStarted().isPresent());
        this.task.resume();
        MatcherAssert.assertThat("task is not idling", !this.task.timeCurrentIdlingStarted().isPresent());
    }

    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10, milliseconds + 20, milliseconds + 30, milliseconds + 50);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        long milliseconds = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue(this.task.canPunctuateSystemTime());
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.canPunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 100, milliseconds + 110, milliseconds + 122, milliseconds + 130, milliseconds + 235, milliseconds + 240);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldNotShareHeadersBetweenPunctuateIterations() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j -> {
            this.task.processorContext().headers().add("dummy", (byte[]) null);
        });
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j2 -> {
            Assert.assertFalse(this.task.processorContext().headers().iterator().hasNext());
        });
    }

    @Test
    public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createFaultyStatefulTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 5L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L)));
        MatcherAssert.assertThat("Map did not contain the partitions", this.task.highWaterMark().containsKey(this.partition1) && this.task.highWaterMark().containsKey(this.partition2));
        Assert.assertThrows(StreamsException.class, () -> {
            this.task.process(0L);
        });
    }

    @Test
    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = createDisconnectedTask(createConfig("100"));
        this.task.transitionTo(Task.State.RESTORING);
        Assert.assertThrows(TimeoutException.class, () -> {
            this.task.completeRestoration(set -> {
            });
        });
    }

    @Test
    public void shouldReInitializeTopologyWhenResuming() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(true);
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).andThrow(new AssertionError("Should not try to read offsets")).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateDirectory, this.stateManager});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.resume();
        Assert.assertEquals(Task.State.RESTORING, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source2.initialized);
        this.task.completeRestoration(set -> {
        });
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        Assert.assertTrue(this.source1.initialized);
        Assert.assertTrue(this.source2.initialized);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
        EasyMock.reset(new Object[]{this.recordCollector});
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        EasyMock.replay(new Object[]{this.recordCollector});
        MatcherAssert.assertThat("Map did not contain the partition", this.task.highWaterMark().containsKey(this.partition1));
    }

    @Test
    public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 20L));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
        MatcherAssert.assertThat("Map was empty", this.task.highWaterMark().size() == 2);
    }

    @Test
    public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 0L)).andReturn(Collections.singletonMap(this.changelogPartition, 10L)).andReturn(Collections.singletonMap(this.changelogPartition, 12000L));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback, (CommitCallback) null);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.recordCollector});
        MatcherAssert.assertThat("Map was empty", this.task.highWaterMark().size() == 2);
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.registerStore(this.stateStore, this.stateStore.stateRestoreCallback, (CommitCallback) null);
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("exactly_once_v2", "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertFalse(new File(this.stateDirectory.getOrCreateDirectoryForTask(this.taskId), ".checkpoint").exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail("Should throw illegal state exception as current node is not null");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(this.task.processorContext().currentNode(), CoreMatchers.nullValue());
    }

    @Test
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = createStatelessTask(createConfig("100"));
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
            });
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
        });
    }

    @Test
    public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createFaultyStatefulTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.suspend();
        });
        this.task.closeDirty();
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition topicPartition = new TopicPartition("repartition", 1);
        ProcessorTopology withRepartitionTopics = withRepartitionTopics(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1), Utils.mkEntry(topicPartition.topic(), this.source2)}), Collections.singleton(topicPartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, topicPartition));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, 0L)}));
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig createConfig = createConfig();
        this.task = new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1, topicPartition}), withRepartitionTopics, this.consumer, new TopologyConfig((String) null, createConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, createConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 5L)));
        this.task.addRecords(topicPartition, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(topicPartition, 10L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.process(0L));
        this.task.prepareCommit();
        MatcherAssert.assertThat(this.task.purgeableOffsets(), CoreMatchers.equalTo(Collections.singletonMap(topicPartition, 11L)));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig("100"), new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.6
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw new KafkaException("KABOOM!");
            }
        });
        this.task.transitionTo(Task.State.RESTORING);
        Assert.assertThrows(StreamsException.class, () -> {
            this.task.completeRestoration(set -> {
            });
        });
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(IllegalStateException.class, streamTask::prepareCommit);
    }

    @Test
    public void shouldThrowIfPostCommittingOnIllegalState() {
        this.task = createStatelessTask(createConfig("100"));
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.postCommit(true);
        });
    }

    @Test
    public void shouldSkipCheckpointingSuspendedCreatedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.suspend();
        this.task.postCommit(true);
    }

    @Test
    public void shouldCheckpointForSuspendedTask() {
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 1L));
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 0L)).andReturn(Collections.singletonMap(this.partition1, 1L)).andReturn(Collections.singletonMap(this.partition1, 2L));
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 0L)).andReturn(Collections.singletonMap(this.partition1, 12000L)).andReturn(Collections.singletonMap(this.partition1, 24000L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().times(2);
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspendedRunningTask() {
        Map singletonMap = Collections.singletonMap(this.partition1, 1L);
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        this.stateManager.updateChangelogOffsets((Map) EasyMock.eq(singletonMap));
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).andReturn(singletonMap);
        EasyMock.expect(this.recordCollector.offsets()).andReturn(singletonMap).times(2);
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig(), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singleton(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L)));
        this.task.addRecords(this.partition2, Collections.singleton(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 10L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.postCommit(true);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 50L)).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition1)).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Collections.singletonMap(this.partition1, 50L), this.task.changelogOffsets());
        this.task.completeRestoration(set -> {
        });
        Assert.assertEquals(Collections.singletonMap(this.partition1, -2L), this.task.changelogOffsets());
    }

    @Test
    public void shouldNotCheckpointOnCloseCreated() {
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.suspend();
        this.task.closeClean();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        Assert.assertFalse(this.source1.initialized);
        Assert.assertFalse(this.source1.closed);
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
    }

    @Test
    public void shouldCheckpointOnCloseRestoringIfNoProgress() {
        this.stateManager.flush();
        EasyMock.expectLastCall().once();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldAlwaysCheckpointStateIfEnforced() {
        this.stateManager.flush();
        EasyMock.expectLastCall().once();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect(this.recordCollector.offsets()).andStubReturn(Collections.emptyMap());
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(true);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
        this.stateManager.flush();
        EasyMock.expectLastCall().once();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 50L)).andReturn(Collections.singletonMap(this.partition1, 11000L)).andReturn(Collections.singletonMap(this.partition1, 12000L));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(false);
        Assert.assertTrue(this.task.offsetSnapshotSinceLastFlush.isEmpty());
        this.task.maybeCheckpoint(false);
        Assert.assertEquals(Collections.singletonMap(this.partition1, 11000L), this.task.offsetSnapshotSinceLastFlush);
        this.task.maybeCheckpoint(false);
        Assert.assertEquals(Collections.singletonMap(this.partition1, 11000L), this.task.offsetSnapshotSinceLastFlush);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldCheckpointOffsetsOnPostCommit() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 10543L)).andReturn(Collections.singletonMap(this.partition1, 12543L));
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig(), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 345L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowExceptionOnCloseCleanError() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.changelogPartition)).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L)).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        Assert.assertThrows(ProcessorStateException.class, () -> {
            this.task.closeClean();
        });
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andStubReturn(Collections.singleton(this.changelogPartition));
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanFlushError() {
        this.stateManager.flush();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.singletonMap(this.changelogPartition, 543L));
        this.stateManager.flushCache();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 543L)));
        this.task.process(100L);
        StreamTask streamTask = this.task;
        streamTask.getClass();
        Assert.assertThrows(ProcessorStateException.class, streamTask::prepareCommit);
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition1, 54300L));
        EasyMock.replay(new Object[]{this.recordCollector, this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 54300L)));
        this.task.process(100L);
        Assert.assertTrue(this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        Assert.assertThrows(ProcessorStateException.class, () -> {
            this.task.postCommit(true);
        });
        Assert.assertEquals(Task.State.SUSPENDED, this.task.state());
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.closeClean();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.closeDirty();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsAndCloseInPrepareRecycle() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.stateManager.recycle();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.not(Matchers.empty()));
        this.task.prepareRecycle();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
        MatcherAssert.assertThat(this.task.state(), Matchers.is(Task.State.CLOSED));
    }

    @Test
    public void shouldClearCommitStatusesInCloseDirty() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue(this.task.process(0L));
        this.task.requestCommit();
        this.task.suspend();
        MatcherAssert.assertThat(Boolean.valueOf(this.task.commitNeeded()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.task.commitRequested()), Matchers.is(true));
        this.task.closeDirty();
        MatcherAssert.assertThat(Boolean.valueOf(this.task.commitNeeded()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.task.commitRequested()), Matchers.is(false));
    }

    @Test
    public void closeShouldBeIdempotent() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createOptimizedStatefulTask(createConfig("100"), this.consumer);
        this.task.suspend();
        this.task.closeClean();
        this.task.closeClean();
        this.task.closeDirty();
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldUpdatePartitions() {
        this.task = createStatelessTask(createConfig());
        HashSet hashSet = new HashSet(this.task.inputPartitions());
        hashSet.add(new TopicPartition("newTopic", 0));
        this.task.updateInputPartitions(hashSet, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.source1.name(), Arrays.asList(AssignmentTestUtils.TP_1_NAME, "newTopic")), Utils.mkEntry(this.source2.name(), Collections.singletonList(AssignmentTestUtils.TP_2_NAME))}));
        MatcherAssert.assertThat(this.task.inputPartitions(), CoreMatchers.equalTo(hashSet));
    }

    @Test
    public void shouldThrowIfCleanClosingDirtyTask() {
        this.task = createSingleSourceStateless(createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue(this.task.process(0L));
        Assert.assertTrue(this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.task.closeClean();
        });
    }

    @Test
    public void shouldThrowIfRecyclingDirtyTask() {
        this.task = createStatelessTask(createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition2, Collections.singletonList(getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L)));
        this.task.process(0L);
        Assert.assertTrue(this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.task.prepareRecycle();
        });
    }

    @Test
    public void shouldPrepareRecycleSuspendedTask() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        this.stateManager.recycle();
        EasyMock.expectLastCall().once();
        this.recordCollector.closeClean();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).once();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createStatefulTask(createConfig("100"), true);
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.prepareRecycle();
        });
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.prepareRecycle();
        });
        this.task.completeRestoration(set -> {
        });
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.prepareRecycle();
        });
        this.task.suspend();
        this.task.prepareRecycle();
        MatcherAssert.assertThat(this.task.state(), Matchers.is(Task.State.CLOSED));
        EasyMock.verify(new Object[]{this.stateManager, this.recordCollector});
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig("100"), true);
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRestoringTasks() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStatefulTask(createConfig("100"), true);
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.RESTORING));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        this.task = createFaultyStatefulTask(createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.suspend();
        });
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
        ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(this.taskId, createConfig("100"), this.stateManager, this.streamsMetrics, (ThreadCache) null);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test", "latest", this.time);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.replay(new Object[]{this.stateManager});
        ProcessorTopology withSources = withSources(Collections.emptyList(), Utils.mkMap(new Map.Entry[0]));
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            new StreamTask(this.taskId, this.partitions, withSources, this.consumer, new TopologyConfig((String) null, createConfig("100"), new Properties()).getTaskConfig(), streamsMetricsImpl, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, processorContextImpl, this.logContext);
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: Topic topic1 is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order."));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        this.task = createStatelessTask(createConfig());
        this.task.maybeInitTaskTimeoutOrThrow(0L, (Exception) null);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), (Exception) null);
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (Exception) null);
        }).getCause(), Matchers.isA(TimeoutException.class));
    }

    @Test
    public void shouldClearTaskTimeout() {
        this.task = createStatelessTask(createConfig());
        this.task.maybeInitTaskTimeoutOrThrow(0L, (Exception) null);
        this.task.clearTaskTimeout();
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (Exception) null);
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.kafka.clients.consumer.ConsumerRecord[], long, java.lang.Object[]] */
    @Test
    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
        this.task = createStatelessTask(createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        ?? r0 = {getCorruptedConsumerRecordWithOffsetAsTimestamp((-1) + 1), getCorruptedConsumerRecordWithOffsetAsTimestamp(r0 + 1)};
        List asList = Arrays.asList(r0);
        this.consumer.addRecord((ConsumerRecord) asList.get(0));
        this.consumer.addRecord((ConsumerRecord) asList.get(1));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, asList);
        Assert.assertTrue(this.task.process((long) r0));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(r0 + 1, new TopicPartitionMetadata(-1L, new ProcessorMetadata()).encode()))})));
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.kafka.clients.consumer.ConsumerRecord[], long, java.lang.Object[]] */
    @Test
    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
        this.task = createStatelessTask(createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        ?? r0 = {getCorruptedConsumerRecordWithOffsetAsTimestamp((-1) + 1), getConsumerRecordWithOffsetAsTimestamp(this.partition1, r0 + 1)};
        List asList = Arrays.asList(r0);
        this.consumer.addRecord((ConsumerRecord) asList.get(0));
        this.consumer.addRecord((ConsumerRecord) asList.get(1));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, asList);
        Assert.assertTrue(this.task.process((long) r0));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(r0 + 1, new TopicPartitionMetadata((long) r0, new ProcessorMetadata()).encode()))})));
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.kafka.clients.consumer.ConsumerRecord[], long, java.lang.Object[]] */
    @Test
    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
        this.task = createStatelessTask(createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        ?? r0 = {getConsumerRecordWithOffsetAsTimestamp(this.partition1, (-1) + 1), getCorruptedConsumerRecordWithOffsetAsTimestamp(r0 + 1)};
        List asList = Arrays.asList(r0);
        this.consumer.addRecord((ConsumerRecord) asList.get(0));
        this.consumer.addRecord((ConsumerRecord) asList.get(1));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, asList);
        Assert.assertTrue(this.task.process((long) r0));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(1L, new TopicPartitionMetadata(0L, new ProcessorMetadata()).encode()))})));
        Assert.assertTrue(this.task.process((long) r0));
        Assert.assertTrue(this.task.commitNeeded());
        MatcherAssert.assertThat(this.task.prepareCommit(), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, new OffsetAndMetadata(2L, new TopicPartitionMetadata(0L, new ProcessorMetadata()).encode()))})));
    }

    @Test
    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
        ProcessorStateManager mockStateManager = mockStateManager();
        this.recordCollector = (RecordCollector) Mockito.mock(RecordCollectorImpl.class);
        this.task = createStatefulTask(createConfig("at_least_once", "100"), true, mockStateManager);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        ((ProcessorStateManager) Mockito.verify(mockStateManager)).checkpoint();
    }

    @Test
    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
        ProcessorStateManager mockStateManager = mockStateManager();
        this.recordCollector = (RecordCollector) Mockito.mock(RecordCollectorImpl.class);
        this.task = createStatefulTask(createConfig("exactly_once_v2", "100"), true, mockStateManager);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        ((ProcessorStateManager) Mockito.verify(mockStateManager, Mockito.never())).checkpoint();
        ((ProcessorStateManager) Mockito.verify(mockStateManager, Mockito.never())).changelogOffsets();
        ((RecordCollector) Mockito.verify(this.recordCollector, Mockito.never())).offsets();
    }

    private ProcessorStateManager mockStateManager() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        ((ProcessorStateManager) Mockito.doReturn(Task.TaskType.ACTIVE).when(processorStateManager)).taskType();
        ((ProcessorStateManager) Mockito.doReturn(this.taskId).when(processorStateManager)).taskId();
        return processorStateManager;
    }

    private List<MetricName> getTaskMetrics() {
        return (List) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.tags().containsKey("task-id");
        }).collect(Collectors.toList());
    }

    private StreamTask createOptimizedStatefulTask(StreamsConfig streamsConfig, org.apache.kafka.clients.consumer.Consumer<byte[], byte[]> consumer) {
        return new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1}), ProcessorTopologyFactories.with(Collections.singletonList(this.source1), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1)}), Collections.singletonList(new MockKeyValueStore("store", true)), Collections.singletonMap("store", AssignmentTestUtils.TP_1_NAME)), consumer, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private StreamTask createDisconnectedTask(StreamsConfig streamsConfig) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1), Utils.mkEntry(AssignmentTestUtils.TP_2_NAME, this.source2)}), Collections.singletonList(new MockKeyValueStore("store", false)), Collections.emptyMap()), new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.7
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw new TimeoutException("KABOOM!");
            }
        }, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private StreamTask createFaultyStatefulTask(StreamsConfig streamsConfig) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source3), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1), Utils.mkEntry(AssignmentTestUtils.TP_2_NAME, this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap()), this.consumer, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private StreamTask createStatefulTask(StreamsConfig streamsConfig, boolean z) {
        return createStatefulTask(streamsConfig, z, this.stateManager);
    }

    private StreamTask createStatefulTask(StreamsConfig streamsConfig, boolean z, ProcessorStateManager processorStateManager) {
        return new StreamTask(this.taskId, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1), Utils.mkEntry(AssignmentTestUtils.TP_2_NAME, this.source2)}), Collections.singletonList(new MockKeyValueStore("store", z)), z ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap()), this.consumer, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, processorStateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, processorStateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private StreamTask createSingleSourceStateless(StreamsConfig streamsConfig, String str) {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1)}));
        this.source1.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        return new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1}), withSources, this.consumer, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", str, this.time), this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private StreamTask createStatelessTask(StreamsConfig streamsConfig) {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.source1), Utils.mkEntry(AssignmentTestUtils.TP_2_NAME, this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        return new StreamTask(this.taskId, this.partitions, withSources, this.consumer, new TopologyConfig((String) null, streamsConfig, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", "latest", this.time), this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, streamsConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StreamTask createStatelessTaskWithForwardingTopology(SourceNode<Integer, Integer> sourceNode) {
        ProcessorTopology withSources = withSources(Arrays.asList(sourceNode, this.processorStreamTime), Collections.singletonMap(AssignmentTestUtils.TP_1_NAME, sourceNode));
        sourceNode.addChild(this.processorStreamTime);
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(this.recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager, this.recordCollector});
        StreamsConfig createConfig = createConfig();
        return new StreamTask(this.taskId, Collections.singleton(this.partition1), withSources, this.consumer, new TopologyConfig((String) null, createConfig, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", "latest", this.time), this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, createConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private void createTimeoutTask(String str) {
        EasyMock.replay(new Object[]{this.stateManager});
        ProcessorTopology withSources = withSources(Collections.singletonList(this.timeoutSource), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_1_NAME, this.timeoutSource)}));
        StreamsConfig createConfig = createConfig(str, "0");
        this.task = new StreamTask(this.taskId, Utils.mkSet(new TopicPartition[]{this.partition1}), withSources, this.consumer, new TopologyConfig((String) null, createConfig, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.stateManager, this.recordCollector, new ProcessorContextImpl(this.taskId, createConfig, this.stateManager, this.streamsMetrics, (ThreadCache) null), this.logContext);
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(TopicPartition topicPartition, long j, int i) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, j, TimestampType.CREATE_TIME, 0, 0, this.recordKey, this.intSerializer.serialize((String) null, Integer.valueOf(i)), new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(TopicPartition topicPartition, long j) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, j, TimestampType.CREATE_TIME, 0, 0, this.recordKey, this.recordValue, new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(Integer num, long j) {
        return new ConsumerRecord<>(AssignmentTestUtils.TP_1_NAME, 1, j, j, TimestampType.CREATE_TIME, 0, 0, new IntegerSerializer().serialize(AssignmentTestUtils.TP_1_NAME, num), this.recordValue, new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(long j) {
        return new ConsumerRecord<>(AssignmentTestUtils.TP_1_NAME, 1, j, j, TimestampType.CREATE_TIME, -1, -1, new byte[0], "I am not an integer.".getBytes(), new RecordHeaders(), Optional.empty());
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]).add(metricName, new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double d, StreamsMetricsImpl streamsMetricsImpl, MetricName metricName) {
        KafkaMetric kafkaMetric = (KafkaMetric) streamsMetricsImpl.metrics().get(metricName);
        MatcherAssert.assertThat(Double.valueOf(kafkaMetric.measurable().measure(kafkaMetric.config(), System.currentTimeMillis())), CoreMatchers.equalTo(Double.valueOf(d)));
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.punctuatedAt = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$002(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long");
    }

    static {
    }
}
