package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest.class */
public class StreamThreadTest {
    private static final String APPLICATION_ID = "stream-thread-test";
    private static final UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d");
    private static final String CLIENT_ID = "stream-thread-test-" + PROCESS_ID;
    private StreamsMetadataState streamsMetadataState;
    private InternalTopologyBuilder internalTopologyBuilder;
    private final int threadIdx = 1;
    private final Metrics metrics = new Metrics();
    private final MockTime mockTime = new MockTime();
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private final StreamsConfig config = new StreamsConfig(configProps(false));
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
    private final ChangelogReader changelogReader = new MockChangelogReader();
    private final StateDirectory stateDirectory = new StateDirectory(this.config, this.mockTime, true);
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$EasyMockConsumerClientSupplier.class */
    private static class EasyMockConsumerClientSupplier extends MockClientSupplier {
        final Consumer<byte[], byte[]> mockConsumer;
        final Map<String, Object> consumerConfigs = new HashMap();

        EasyMockConsumerClientSupplier(Consumer<byte[], byte[]> consumer) {
            this.mockConsumer = consumer;
        }

        @Override // org.apache.kafka.test.MockClientSupplier
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
            this.consumerConfigs.putAll(map);
            return this.mockConsumer;
        }

        AtomicLong nextRebalanceMs() {
            return (AtomicLong) this.consumerConfigs.get("__next.probing.rebalance.ms__");
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$StateListenerStub.class */
    private static class StateListenerStub implements StreamThread.StateListener {
        int numChanges;
        ThreadStateTransitionValidator oldState;
        ThreadStateTransitionValidator newState;

        private StateListenerStub() {
            this.numChanges = 0;
            this.oldState = null;
            this.newState = null;
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            this.numChanges++;
            if (this.newState != null && this.newState != threadStateTransitionValidator2) {
                throw new RuntimeException("State mismatch " + threadStateTransitionValidator2 + " different from " + this.newState);
            }
            this.oldState = threadStateTransitionValidator2;
            this.newState = threadStateTransitionValidator;
        }
    }

    @Before
    public void setUp() {
        Thread.currentThread().setName(CLIENT_ID + "-StreamThread-1");
        this.internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(this.internalStreamsBuilder);
        this.internalTopologyBuilder.setApplicationId(APPLICATION_ID);
        this.streamsMetadataState = new StreamsMetadataState(this.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
    }

    private Properties configProps(boolean z) {
        Map.Entry[] entryArr = new Map.Entry[6];
        entryArr[0] = Utils.mkEntry("application.id", APPLICATION_ID);
        entryArr[1] = Utils.mkEntry("bootstrap.servers", "localhost:2171");
        entryArr[2] = Utils.mkEntry("buffered.records.per.partition", "3");
        entryArr[3] = Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName());
        entryArr[4] = Utils.mkEntry("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        entryArr[5] = Utils.mkEntry("processing.guarantee", z ? "exactly_once" : "at_least_once");
        return Utils.mkProperties(Utils.mkMap(entryArr));
    }

    private Cluster createCluster() {
        Node node = new Node(-1, "localhost", 8121);
        return new Cluster("mockClusterId", Collections.singletonList(node), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), node);
    }

    private StreamThread createStreamThread(String str, StreamsConfig streamsConfig, boolean z) {
        if (z) {
            this.clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
        }
        this.clientSupplier.setCluster(createCluster());
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"));
        this.internalTopologyBuilder.buildTopology();
        return StreamThread.create(this.internalTopologyBuilder, streamsConfig, this.clientSupplier, this.clientSupplier.getAdmin(streamsConfig.getAdminConfigs(str)), PROCESS_ID, str, streamsMetricsImpl, this.mockTime, this.streamsMetadataState, 0L, this.stateDirectory, new MockStateRestoreListener(), 1);
    }

    @Test
    public void shouldChangeStateInRebalanceListener() {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        StateListenerStub stateListenerStub = new StateListenerStub();
        createStreamThread.setStateListener(stateListenerStub);
        Assert.assertEquals(createStreamThread.state(), StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = createStreamThread.rebalanceListener();
        createStreamThread.setState(StreamThread.State.STARTING);
        rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertEquals(createStreamThread.state(), StreamThread.State.PARTITIONS_REVOKED);
        List singletonList = Collections.singletonList(this.t1p1);
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(singletonList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        rebalanceListener.onPartitionsAssigned(singletonList);
        createStreamThread.runOnce();
        Assert.assertEquals(createStreamThread.state(), StreamThread.State.RUNNING);
        Assert.assertEquals(4L, stateListenerStub.numChanges);
        Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListenerStub.oldState);
        createStreamThread.shutdown();
        Assert.assertSame(StreamThread.State.PENDING_SHUTDOWN, createStreamThread.state());
    }

    @Test
    public void shouldChangeStateAtStartClose() throws Exception {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        createStreamThread.setStateListener(new StateListenerStub());
        createStreamThread.start();
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        createStreamThread.shutdown();
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        createStreamThread.shutdown();
        Assert.assertEquals(createStreamThread.state(), StreamThread.State.DEAD);
    }

    @Test
    public void shouldCreateMetricsAtStartupWithBuiltInMetricsVersionLatest() {
        shouldCreateMetricsAtStartup("latest");
    }

    @Test
    public void shouldCreateMetricsAtStartupWithBuiltInMetricsVersion0100To24() {
        shouldCreateMetricsAtStartup("0.10.0-2.4");
    }

    private void shouldCreateMetricsAtStartup(String str) {
        Properties configProps = configProps(false);
        configProps.setProperty("built.in.metrics.version", str);
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps), false);
        String groupName = getGroupName(str);
        Map singletonMap = Collections.singletonMap(getThreadTagKey(str), createStreamThread.getName());
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-total", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-ratio", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-total", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-ratio", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-total", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-ratio", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-total", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-ratio", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-total", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", groupName, "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-total", groupName, "", singletonMap)));
        if (str.equals("0.10.0-2.4")) {
            Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", groupName, "", singletonMap)));
            Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-total", groupName, "", singletonMap)));
        } else {
            Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", groupName, "", singletonMap)));
            Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-total", groupName, "", singletonMap)));
        }
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("task-id", "all"), Utils.mkEntry(getThreadTagKey(str), createStreamThread.getName())});
        if (str.equals("0.10.0-2.4")) {
            Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-task-metrics", "", mkMap)));
        } else {
            Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-task-metrics", "", mkMap)));
            Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-task-metrics", "", mkMap)));
            Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-task-metrics", "", mkMap)));
        }
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        Assert.assertEquals(CLIENT_ID + "-StreamThread-1", createStreamThread.getName());
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s", groupName, getThreadTagKey(str), createStreamThread.getName())));
        if (str.equals("0.10.0-2.4")) {
            Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", getThreadTagKey(str), createStreamThread.getName())));
        } else {
            Assert.assertFalse(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", getThreadTagKey(str), createStreamThread.getName())));
        }
    }

    private String getGroupName(String str) {
        return str.equals("0.10.0-2.4") ? "stream-metrics" : "stream-thread-metrics";
    }

    private String getThreadTagKey(String str) {
        return str.equals("0.10.0-2.4") ? "client-id" : "thread-id";
    }

    @Test
    public void shouldNotCommitBeforeTheCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(consumer, 1, 1);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, mockTaskManagerCommit, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        this.mockTime.sleep(990L);
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        EasyMock.verify(new Object[]{mockTaskManagerCommit});
    }

    @Test
    public void shouldEnforceRebalanceAfterNextScheduledProbingRebalanceTime() throws InterruptedException {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        this.internalTopologyBuilder.buildTopology();
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"));
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        EasyMockConsumerClientSupplier easyMockConsumerClientSupplier = new EasyMockConsumerClientSupplier(consumer);
        easyMockConsumerClientSupplier.setCluster(createCluster());
        StreamThread create = StreamThread.create(this.internalTopologyBuilder, streamsConfig, easyMockConsumerClientSupplier, easyMockConsumerClientSupplier.getAdmin(streamsConfig.getAdminConfigs(CLIENT_ID)), PROCESS_ID, CLIENT_ID, streamsMetricsImpl, this.mockTime, this.streamsMetadataState, 0L, this.stateDirectory, new MockStateRestoreListener(), 1);
        consumer.enforceRebalance();
        EasyMock.replay(new Object[]{consumer});
        easyMockConsumerClientSupplier.nextRebalanceMs().set(this.mockTime.milliseconds() - 1);
        create.start();
        TestUtils.waitForCondition(() -> {
            return create.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        TestUtils.retryOnExceptionWithTimeout(() -> {
            EasyMock.verify(new Object[]{consumer});
        });
        create.shutdown();
        TestUtils.waitForCondition(() -> {
            return create.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldRespectNumIterationsInMainLoop() {
        MockProcessor mockProcessor = new MockProcessor(PunctuationType.WALL_CLOCK_TIME, 10L);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", () -> {
            return mockProcessor;
        }, new String[]{"source1"});
        this.internalTopologyBuilder.addProcessor("processor2", () -> {
            return new MockProcessor(PunctuationType.STREAM_TIME, 10L);
        }, new String[]{"source1"});
        Properties properties = new Properties();
        properties.put("commit.interval.ms", 100L);
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID, "localhost:2171", Serdes.ByteArraySerde.class.getName(), Serdes.ByteArraySerde.class.getName(), properties)), false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        createStreamThread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(singleton);
        createStreamThread.runOnce();
        long j = (-1) + 1;
        addRecord(mainConsumer, j, 0L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, j + 1, 1L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(2));
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(2));
        this.mockTime.sleep(11L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(2));
        this.mockTime.sleep(11L);
        addRecord(mainConsumer, this + 1, 5L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 5L);
        addRecord(mainConsumer, this + 1, 6L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(3));
        addRecord(mainConsumer, this + 1, 11L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 12L);
        addRecord(mainConsumer, this + 1, 13L);
        addRecord(mainConsumer, this + 1, 14L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(3));
        mockProcessor.requestCommit();
        addRecord(mainConsumer, this + 1, 15L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 15L);
        addRecord(mainConsumer, this + 1, 16L);
        addRecord(mainConsumer, this + 1, 17L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(3));
        this.mockTime.sleep(90L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(3));
        this.mockTime.sleep(90L);
        addRecord(mainConsumer, this + 1, 18L);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.currentNumIterations()), CoreMatchers.equalTo(1));
    }

    @Test
    public void shouldNotCauseExceptionIfNothingCommitted() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(consumer, 1, 0);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, mockTaskManagerCommit, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        this.mockTime.sleep(990L);
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        EasyMock.verify(new Object[]{mockTaskManagerCommit});
    }

    @Test
    public void shouldCommitAfterCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(100L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, (Admin) null, consumer, consumer, this.changelogReader, (String) null, new TaskManager(null, null, null, null, null, null, null, null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.1
            int commit(Collection<Task> collection) {
                atomicBoolean.set(true);
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        }, streamsMetricsImpl, this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        Assert.assertTrue(atomicBoolean.get());
        this.mockTime.sleep(100L);
        atomicBoolean.set(false);
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        Assert.assertFalse(atomicBoolean.get());
        this.mockTime.sleep(1L);
        atomicBoolean.set(false);
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldRecordCommitLatency() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        EasyMock.expect(consumer.poll((Duration) EasyMock.anyObject())).andStubReturn(new ConsumerRecords(Collections.emptyMap()));
        Task task = (Task) EasyMock.niceMock(Task.class);
        EasyMock.expect(task.id()).andStubReturn(this.task1);
        EasyMock.expect(task.inputPartitions()).andStubReturn(Collections.singleton(this.t1p1));
        ActiveTaskCreator activeTaskCreator = (ActiveTaskCreator) EasyMock.mock(ActiveTaskCreator.class);
        EasyMock.expect(activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.anyObject())).andStubReturn(Collections.singleton(task));
        EasyMock.expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId"));
        EasyMock.replay(new Object[]{consumer, task, activeTaskCreator});
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest");
        TaskManager taskManager = new TaskManager(null, null, null, activeTaskCreator, null, this.internalTopologyBuilder, null, null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.2
            int commit(Collection<Task> collection) {
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        };
        taskManager.setMainConsumer(consumer);
        StreamThread streamThread = new StreamThread(this.mockTime, this.config, (Admin) null, consumer, consumer, this.changelogReader, (String) null, taskManager, streamsMetricsImpl, this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        streamThread.updateThreadMetadata("adminClientId");
        streamThread.setState(StreamThread.State.STARTING);
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        streamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        streamThread.rebalanceListener().onPartitionsAssigned(Collections.singleton(this.t1p1));
        Assert.assertTrue(Double.isNaN(((Double) ((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()).doubleValue()));
        Assert.assertTrue(Double.isNaN(((Double) ((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()).doubleValue()));
        streamThread.runOnce();
        MatcherAssert.assertThat(((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mainConsumer.updateBeginningOffsets(hashMap2);
        createStreamThread.rebalanceListener().onPartitionsAssigned(new HashSet(arrayList));
        Assert.assertEquals(1L, this.clientSupplier.producers.size());
        Producer producer = this.clientSupplier.producers.get(0);
        Iterator it = createStreamThread.activeTasks().iterator();
        while (it.hasNext()) {
            Assert.assertSame(producer, ((Task) it.next()).recordCollector().producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, createStreamThread.mainConsumer());
        Assert.assertSame(this.clientSupplier.restoreConsumer, createStreamThread.restoreConsumer());
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mainConsumer.updateBeginningOffsets(hashMap2);
        createStreamThread.rebalanceListener().onPartitionsAssigned(new HashSet(arrayList));
        createStreamThread.runOnce();
        Assert.assertEquals(createStreamThread.activeTasks().size(), this.clientSupplier.producers.size());
        Assert.assertSame(this.clientSupplier.consumer, createStreamThread.mainConsumer());
        Assert.assertSame(this.clientSupplier.restoreConsumer, createStreamThread.restoreConsumer());
    }

    @Test
    public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        createStreamThread.start();
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        createStreamThread.taskManager().handleRebalanceStart(Collections.singleton("topic1"));
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        createStreamThread.shutdown();
        Assert.assertFalse(createStreamThread.isRunning());
        Thread.sleep(1000L);
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task1, this.task2}), createStreamThread.taskManager().activeTaskIds());
        Assert.assertEquals(StreamThread.State.PENDING_SHUTDOWN, createStreamThread.state());
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        Assert.assertEquals(Collections.emptySet(), createStreamThread.taskManager().activeTaskIds());
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        createStreamThread.start();
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.shutdown();
        TestUtils.waitForCondition(() -> {
            return createStreamThread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        Iterator it = createStreamThread.activeTasks().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Task) it.next()).recordCollector().producer().closed());
        }
    }

    @Test
    public void shouldShutdownTaskManagerOnClose() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        updateThreadMetadata.setStateListener((thread, threadStateTransitionValidator, threadStateTransitionValidator2) -> {
            if (threadStateTransitionValidator2 == StreamThread.State.CREATED && threadStateTransitionValidator == StreamThread.State.STARTING) {
                updateThreadMetadata.shutdown();
            }
        });
        updateThreadMetadata.run();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldNotReturnDataAfterTaskMigrated() {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        this.internalTopologyBuilder = (InternalTopologyBuilder) EasyMock.createNiceMock(InternalTopologyBuilder.class);
        EasyMock.expect(this.internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList("topic1")).times(2);
        final MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        mockConsumer.subscribe(Collections.singletonList("topic1"), new MockRebalanceListener());
        mockConsumer.rebalance(Collections.singletonList(this.t1p1));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        mockConsumer.seekToEnd(Collections.singletonList(this.t1p1));
        MockChangelogReader mockChangelogReader = new MockChangelogReader() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.3
            @Override // org.apache.kafka.streams.processor.internals.MockChangelogReader
            public void restore() {
                mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 11L, new byte[0], new byte[0]));
                mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 12L, new byte[1], new byte[0]));
                throw new TaskMigratedException("Changelog restore found task migrated", new RuntimeException("restore task migrated"));
            }
        };
        taskManager.handleLostAll();
        EasyMock.replay(new Object[]{taskManager, this.internalTopologyBuilder});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, (Admin) null, mockConsumer, mockConsumer, mockChangelogReader, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        updateThreadMetadata.getClass();
        IllegalStateException illegalStateException = (IllegalStateException) Assert.assertThrows(IllegalStateException.class, updateThreadMetadata::run);
        EasyMock.verify(new Object[]{taskManager});
        Assert.assertEquals("No current assignment for partition topic1-1", illegalStateException.getMessage());
        Assert.assertFalse(mockConsumer.shouldRebalance());
    }

    @Test
    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        new StreamThread(this.mockTime, this.config, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID)).shutdown();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldOnlyShutdownOnce() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        updateThreadMetadata.shutdown();
        updateThreadMetadata.run();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(Collections.emptyMap(), hashMap);
        createStreamThread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing() throws Exception {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.consumer;
        mockConsumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)));
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.activeTasks().size()), CoreMatchers.equalTo(1));
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(arrayList.iterator().next(), 0L));
        mockConsumer.unsubscribe();
        mockConsumer.assign(new HashSet(arrayList));
        mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 0L, new byte[0], new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.equalTo(1));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        TestUtils.waitForCondition(() -> {
            return mockProducer.commitCount() == 1;
        }, "StreamsThread did not commit transaction.");
        mockProducer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 1L, new byte[0], new byte[0]));
        try {
            createStreamThread.runOnce();
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (KafkaException e) {
            Assert.assertTrue(e instanceof TaskMigratedException);
            Assert.assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", createStreamThread.activeTasks().stream().anyMatch(task -> {
                return task.id().equals(this.task1);
            }));
        }
        MatcherAssert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) createStreamThread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.activeTasks().size()), CoreMatchers.equalTo(1));
        addRecord(mockConsumer, 0L);
        createStreamThread.runOnce();
        this.clientSupplier.producers.get(0).commitTransactionException = new ProducerFencedException("Producer is fenced");
        Assert.assertThrows(TaskMigratedException.class, () -> {
            createStreamThread.rebalanceListener().onPartitionsRevoked(arrayList);
        });
        Assert.assertFalse(this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertFalse(this.clientSupplier.producers.get(0).closed());
        Assert.assertEquals(1L, createStreamThread.activeTasks().size());
    }

    @Test
    public void shouldReinitializeRevivedTasksInAnyState() {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), false);
        final TopicPartition topicPartition = new TopicPartition("stream-thread-test-store-changelog", 1);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        this.internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.4
            public Processor<Object, Object> get() {
                return new Processor<Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.4.1
                    public void init(ProcessorContext processorContext) {
                    }

                    public void process(Object obj, Object obj2) {
                        if (atomicBoolean.get()) {
                            throw new TaskCorruptedException(Collections.singletonMap(StreamThreadTest.this.task1, new HashSet(Collections.singleton(topicPartition))));
                        }
                        atomicBoolean2.set(true);
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"name"});
        this.internalTopologyBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.String(), Serdes.String()), new String[]{"proc"});
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) createStreamThread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p1, 0L)}));
        createStreamThread.restoreConsumer().updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, 0L)}));
        createStreamThread.adminClient().updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.activeTasks().size()), CoreMatchers.equalTo(1));
        createStreamThread.runOnce();
        addRecord(mockConsumer, 0L);
        atomicBoolean.set(true);
        createStreamThread.getClass();
        createStreamThread.taskManager().handleCorruption(Assert.assertThrows(TaskCorruptedException.class, createStreamThread::runOnce).corruptedTaskWithChangelogs());
        createStreamThread.runOnce();
        createStreamThread.runOnce();
        addRecord(mockConsumer, 0L);
        atomicBoolean.set(false);
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(false));
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(true));
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.consumer;
        mockConsumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)));
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.activeTasks().size()), CoreMatchers.equalTo(1));
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        mockProducer.commitTransactionException = new ProducerFencedException("Producer is fenced");
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 1L, new byte[0], new byte[0]));
        try {
            createStreamThread.runOnce();
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (KafkaException e) {
            Assert.assertTrue(e instanceof TaskMigratedException);
            Assert.assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", createStreamThread.activeTasks().stream().anyMatch(task -> {
                return task.id().equals(this.task1);
            }));
        }
        MatcherAssert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(0L));
        Assert.assertTrue(this.clientSupplier.producers.get(0).transactionInFlight());
        Assert.assertFalse(this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertFalse(this.clientSupplier.producers.get(0).closed());
        Assert.assertEquals(1L, createStreamThread.activeTasks().size());
    }

    @Test
    public void shouldNotCloseTaskProducerWhenSuspending() {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) createStreamThread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        createStreamThread.runOnce();
        MatcherAssert.assertThat(Integer.valueOf(createStreamThread.activeTasks().size()), CoreMatchers.equalTo(1));
        addRecord(mockConsumer, 0L);
        createStreamThread.runOnce();
        createStreamThread.rebalanceListener().onPartitionsRevoked(arrayList);
        Assert.assertTrue(this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertFalse(this.clientSupplier.producers.get(0).closed());
        Assert.assertEquals(1L, createStreamThread.activeTasks().size());
    }

    @Test
    public void shouldReturnActiveTaskMetadataWhileRunningState() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.clientSupplier.setCluster(createCluster());
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, this.config.getString("built.in.metrics.version"));
        this.internalTopologyBuilder.buildTopology();
        StreamThread create = StreamThread.create(this.internalTopologyBuilder, this.config, this.clientSupplier, this.clientSupplier.getAdmin(this.config.getAdminConfigs(CLIENT_ID)), PROCESS_ID, CLIENT_ID, streamsMetricsImpl, this.mockTime, this.streamsMetadataState, 0L, this.stateDirectory, new MockStateRestoreListener(), 1);
        create.setState(StreamThread.State.STARTING);
        create.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        create.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = create.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        create.rebalanceListener().onPartitionsAssigned(arrayList);
        create.runOnce();
        ThreadMetadata threadMetadata = create.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.activeTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet(new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue(threadMetadata.standbyTasks().isEmpty());
        Assert.assertTrue("#threadState() was: " + threadMetadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(threadMetadata.threadState()));
        String threadName = threadMetadata.threadName();
        MatcherAssert.assertThat(threadName, CoreMatchers.startsWith(CLIENT_ID + "-StreamThread-1"));
        Assert.assertEquals(threadName + "-consumer", threadMetadata.consumerClientId());
        Assert.assertEquals(threadName + "-restore-consumer", threadMetadata.restoreConsumerClientId());
        Assert.assertEquals(Collections.singleton(threadName + "-producer"), threadMetadata.producerClientIds());
        Assert.assertEquals(CLIENT_ID + "-admin", threadMetadata.adminClientId());
    }

    @Test
    public void shouldReturnStandbyTaskMetadataWhileRunningState() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as("count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(Collections.emptyMap(), hashMap2);
        createStreamThread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
        createStreamThread.runOnce();
        ThreadMetadata threadMetadata = createStreamThread.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet(new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue(threadMetadata.activeTasks().isEmpty());
    }

    @Test
    public void shouldUpdateStandbyTask() throws Exception {
        TopicPartition topicPartition = new TopicPartition("stream-thread-test-count-one-changelog", 1);
        TopicPartition topicPartition2 = new TopicPartition("stream-thread-test-table-two-changelog", 1);
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as("count-one"));
        this.internalStreamsBuilder.table("topic2", new ConsumedInternal(), new MaterializedInternal(Materialized.as("table-two"), this.internalStreamsBuilder, ""));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 1, (Node) null, new Node[0], new Node[0])));
        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition2, 10L));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.task3), ".checkpoint")).write(Collections.singletonMap(topicPartition2, 5L));
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task3, Collections.singleton(this.t2p1));
        createStreamThread.taskManager().handleAssignment(Collections.emptyMap(), hashMap);
        createStreamThread.taskManager().tryToCompleteRestoration();
        createStreamThread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
        createStreamThread.runOnce();
        StandbyTask standbyTask = standbyTask(createStreamThread.taskManager(), this.t1p1);
        StandbyTask standbyTask2 = standbyTask(createStreamThread.taskManager(), this.t2p1);
        Assert.assertEquals(this.task1, standbyTask.id());
        Assert.assertEquals(this.task3, standbyTask2.id());
        KeyValueStore store = standbyTask.getStore("count-one");
        KeyValueStore store2 = standbyTask2.getStore("table-two");
        Assert.assertEquals(0L, store.approximateNumEntries());
        Assert.assertEquals(0L, store2.approximateNumEntries());
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                createStreamThread.runOnce();
                Assert.assertEquals(10L, store.approximateNumEntries());
                Assert.assertEquals(4L, store2.approximateNumEntries());
                return;
            } else {
                mockConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-one-changelog", 1, j2, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
                mockConsumer.addRecord(new ConsumerRecord("stream-thread-test-table-two-changelog", 1, j2, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
                j = j2 + 1;
            }
        }
    }

    @Test
    public void shouldCreateStandbyTask() {
        setupInternalTopologyWithoutState();
        this.internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), new String[]{"processor1"});
        MatcherAssert.assertThat(createStandbyTask(), CoreMatchers.not(Matchers.empty()));
    }

    @Test
    public void shouldNotCreateStandbyTaskWithoutStateStores() {
        setupInternalTopologyWithoutState();
        MatcherAssert.assertThat(createStandbyTask(), Matchers.empty());
    }

    @Test
    public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() {
        setupInternalTopologyWithoutState();
        MockKeyValueStoreBuilder mockKeyValueStoreBuilder = new MockKeyValueStoreBuilder("myStore", true);
        mockKeyValueStoreBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addStateStore(mockKeyValueStoreBuilder, new String[]{"processor1"});
        MatcherAssert.assertThat(createStandbyTask(), Matchers.empty());
    }

    @Test
    public void shouldPunctuateActiveTask() {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).process(() -> {
            return new Processor<Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.5
                public void init(ProcessorContext processorContext) {
                    Duration ofMillis = Duration.ofMillis(100L);
                    PunctuationType punctuationType = PunctuationType.STREAM_TIME;
                    List list = arrayList;
                    list.getClass();
                    processorContext.schedule(ofMillis, punctuationType, (v1) -> {
                        r3.add(v1);
                    });
                    Duration ofMillis2 = Duration.ofMillis(100L);
                    PunctuationType punctuationType2 = PunctuationType.WALL_CLOCK_TIME;
                    List list2 = arrayList2;
                    list2.getClass();
                    processorContext.schedule(ofMillis2, punctuationType2, (v1) -> {
                        r3.add(v1);
                    });
                }

                public void process(Object obj, Object obj2) {
                }

                public void close() {
                }
            };
        }, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        ArrayList arrayList3 = new ArrayList();
        HashMap hashMap = new HashMap();
        arrayList3.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.clientSupplier.consumer.assign(arrayList3);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList3);
        createStreamThread.runOnce();
        Assert.assertEquals(0L, arrayList.size());
        Assert.assertEquals(0L, arrayList2.size());
        this.mockTime.sleep(100L);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                createStreamThread.runOnce();
                Assert.assertEquals(1L, arrayList.size());
                Assert.assertEquals(1L, arrayList2.size());
                this.mockTime.sleep(100L);
                createStreamThread.runOnce();
                Assert.assertEquals(1L, arrayList.size());
                Assert.assertEquals(2L, arrayList2.size());
                return;
            }
            this.clientSupplier.consumer.addRecord(new ConsumerRecord("topic1", 1, j2, j2 * 100, TimestampType.CREATE_TIME, -1L, ("K" + j2).getBytes().length, ("V" + j2).getBytes().length, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
            j = j2 + 1;
        }
    }

    @Test
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        Assert.assertEquals(StreamThread.State.CREATED.name(), createStreamThread.threadMetadata().threadState());
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        createStreamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        createStreamThread.setState(StreamThread.State.RUNNING);
        Assert.assertEquals(StreamThread.State.RUNNING.name(), createStreamThread.threadMetadata().threadState());
    }

    @Test
    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as("count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, this.config, false);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Arrays.asList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0]), new PartitionInfo("stream-thread-test-count-one-changelog", 1, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        ArrayList arrayList = new ArrayList();
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.rebalanceListener().onPartitionsRevoked(arrayList);
        assertThreadMetadataHasEmptyTasksWithState(createStreamThread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        arrayList.add(this.t1p1);
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        hashMap3.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().handleAssignment(hashMap2, hashMap3);
        createStreamThread.rebalanceListener().onPartitionsAssigned(arrayList);
        assertThreadMetadataHasEmptyTasksWithState(createStreamThread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
    }

    private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata threadMetadata, StreamThread.State state) {
        Assert.assertEquals(state.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.activeTasks().isEmpty());
        Assert.assertTrue(threadMetadata.standbyTasks().isEmpty());
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
        this.internalStreamsBuilder.stream(Collections.singleton("topic"), this.consumed).groupByKey().count(Materialized.as("count"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        MockConsumer restoreConsumer = createStreamThread.restoreConsumer();
        MockAdminClient adminClient = createStreamThread.adminClient();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Set singleton = Collections.singleton(topicPartition);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), singleton);
        createStreamThread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        mainConsumer.updatePartitions("topic", Collections.singletonList(new PartitionInfo("topic", 0, (Node) null, new Node[0], new Node[0])));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        restoreConsumer.updatePartitions("stream-thread-test-count-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-changelog", 0, (Node) null, new Node[0], new Node[0])));
        TopicPartition topicPartition2 = new TopicPartition("stream-thread-test-count-changelog", 0);
        final Set singleton2 = Collections.singleton(topicPartition2);
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        adminClient.updateEndOffsets(Collections.singletonMap(topicPartition2, 2L));
        mainConsumer.schedulePollTask(() -> {
            createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
            createStreamThread.rebalanceListener().onPartitionsAssigned(singleton);
        });
        try {
            createStreamThread.start();
            TestUtils.waitForCondition(() -> {
                return restoreConsumer.assignment().size() == 1;
            }, "Never get the assignment");
            restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
            TestUtils.waitForCondition(() -> {
                return restoreConsumer.position(topicPartition2) == 1;
            }, "Never restore first record");
            restoreConsumer.setPollException(new InvalidOffsetException("Try Again!") { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.6
                public Set<TopicPartition> partitions() {
                    return singleton2;
                }
            });
            TestUtils.waitForCondition(() -> {
                return restoreConsumer.position(topicPartition2) == 0;
            }, "Never restore first record");
            restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
            restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), "V2".getBytes()));
            TestUtils.waitForCondition(() -> {
                restoreConsumer.assign(singleton2);
                return restoreConsumer.position(topicPartition2) == 2;
            }, "Never finished restore");
            createStreamThread.shutdown();
            createStreamThread.join(10000L);
        } catch (Throwable th) {
            createStreamThread.shutdown();
            createStreamThread.join(10000L);
            throw th;
        }
    }

    @Test
    public void shouldLogAndNotRecordSkippedMetricForDeserializationExceptionWithBuiltInMetricsVersionLatest() {
        shouldLogAndRecordSkippedMetricForDeserializationException("latest");
    }

    @Test
    public void shouldLogAndRecordSkippedMetricForDeserializationExceptionWithBuiltInMetricsVersion0100To24() {
        shouldLogAndRecordSkippedMetricForDeserializationException("0.10.0-2.4");
    }

    /* JADX WARN: Type inference failed for: r3v8, types: [java.lang.String, long] */
    private void shouldLogAndRecordSkippedMetricForDeserializationException(String str) {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Properties configProps = configProps(false);
        configProps.setProperty("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        configProps.setProperty("built.in.metrics.version", str);
        configProps.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps), false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        createStreamThread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(singleton);
        createStreamThread.runOnce();
        if ("0.10.0-2.4".equals(str)) {
            MetricName metricName = this.metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", createStreamThread.getName()));
            MetricName metricName2 = this.metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", createStreamThread.getName()));
            Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
            Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName2).metricValue());
        }
        ?? r3 = this.t1p1.topic();
        mainConsumer.addRecord(new ConsumerRecord((String) r3, this.t1p1.partition(), (-1) + 1, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], "I am not an integer.".getBytes()));
        mainConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), r3 + 1, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], "I am not an integer.".getBytes()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(RecordDeserializer.class);
        Throwable th = null;
        try {
            try {
                createStreamThread.runOnce();
                List<String> messages = createAndRegister.getMessages();
                Assert.assertTrue(messages.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
                Assert.assertTrue(messages.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowTaskMigratedExceptionHandlingTaskLost() {
        Set singleton = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        mockConsumer.assign(singleton);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        taskManager.handleLostAll();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException("Task lost exception", new RuntimeException()));
        EasyMock.replay(new Object[]{taskManager});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, (Admin) null, mockConsumer, mockConsumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        mockConsumer.schedulePollTask(() -> {
            updateThreadMetadata.setState(StreamThread.State.PARTITIONS_REVOKED);
            updateThreadMetadata.rebalanceListener().onPartitionsLost(singleton);
        });
        updateThreadMetadata.setState(StreamThread.State.STARTING);
        updateThreadMetadata.getClass();
        Assert.assertThrows(TaskMigratedException.class, updateThreadMetadata::runOnce);
    }

    @Test
    public void shouldThrowTaskMigratedExceptionHandlingRevocation() {
        Set singleton = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        mockConsumer.assign(singleton);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        taskManager.handleRevocation(singleton);
        EasyMock.expectLastCall().andThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException()));
        EasyMock.replay(new Object[]{taskManager});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, (Admin) null, mockConsumer, mockConsumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        mockConsumer.schedulePollTask(() -> {
            updateThreadMetadata.setState(StreamThread.State.PARTITIONS_REVOKED);
            updateThreadMetadata.rebalanceListener().onPartitionsRevoked(singleton);
        });
        updateThreadMetadata.setState(StreamThread.State.STARTING);
        updateThreadMetadata.getClass();
        Assert.assertThrows(TaskMigratedException.class, updateThreadMetadata::runOnce);
    }

    /* JADX WARN: Type inference failed for: r0v41, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$7] */
    @Test
    public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        Consumer consumer = (Consumer) EasyMock.mock(Consumer.class);
        Task task = (Task) EasyMock.mock(Task.class);
        Task task2 = (Task) EasyMock.mock(Task.class);
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 2);
        final Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(taskId, Collections.emptySet())});
        EasyMock.expect(task.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task.id()).andReturn(taskId).anyTimes();
        EasyMock.expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task2.id()).andReturn(taskId2).anyTimes();
        taskManager.handleCorruption(mkMap);
        EasyMock.replay(new Object[]{task, task2, taskManager});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, null, consumer, consumer, null, null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.7
            void runOnce() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(mkMap);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        updateThreadMetadata.setState(StreamThread.State.STARTING);
        updateThreadMetadata.runLoop();
        EasyMock.verify(new Object[]{taskManager});
    }

    /* JADX WARN: Type inference failed for: r0v45, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$8] */
    @Test
    public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        Consumer consumer = (Consumer) EasyMock.mock(Consumer.class);
        Task task = (Task) EasyMock.mock(Task.class);
        Task task2 = (Task) EasyMock.mock(Task.class);
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 2);
        final Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(taskId, Collections.emptySet())});
        EasyMock.expect(task.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task.id()).andReturn(taskId).anyTimes();
        EasyMock.expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task2.id()).andReturn(taskId2).anyTimes();
        taskManager.handleCorruption(mkMap);
        EasyMock.expectLastCall().andThrow(new TaskMigratedException("Task migrated", new RuntimeException("non-corrupted task migrated")));
        taskManager.handleLostAll();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{task, task2, taskManager});
        StreamThread updateThreadMetadata = new StreamThread(this.mockTime, this.config, null, consumer, consumer, null, null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.8
            void runOnce() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(mkMap);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        updateThreadMetadata.setState(StreamThread.State.STARTING);
        updateThreadMetadata.runLoop();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldNotCommitNonRunningNonRestoringTasks() {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        Consumer consumer = (Consumer) EasyMock.mock(Consumer.class);
        Task task = (Task) EasyMock.mock(Task.class);
        Task task2 = (Task) EasyMock.mock(Task.class);
        Task task3 = (Task) EasyMock.mock(Task.class);
        TaskId taskId = new TaskId(0, 1);
        TaskId taskId2 = new TaskId(0, 2);
        TaskId taskId3 = new TaskId(0, 3);
        EasyMock.expect(task.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task.id()).andReturn(taskId).anyTimes();
        EasyMock.expect(task2.state()).andReturn(Task.State.RESTORING).anyTimes();
        EasyMock.expect(task2.id()).andReturn(taskId2).anyTimes();
        EasyMock.expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
        EasyMock.expect(task3.id()).andReturn(taskId3).anyTimes();
        EasyMock.expect(taskManager.tasks()).andReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(taskId, task), Utils.mkEntry(taskId2, task2), Utils.mkEntry(taskId3, task3)})).anyTimes();
        EasyMock.expect(Integer.valueOf(taskManager.commit(Utils.mkSet(new Task[]{task, task2})))).andReturn(2).times(1);
        StreamThread streamThread = new StreamThread(this.mockTime, this.config, (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        EasyMock.replay(new Object[]{task, task2, task3, taskManager});
        streamThread.setNow(this.mockTime.milliseconds());
        streamThread.maybeCommit();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() {
        shouldLogAndRecordSkippedRecordsForInvalidTimestamps("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndNotRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersionLatest() {
        shouldLogAndRecordSkippedRecordsForInvalidTimestamps("latest");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(String str) {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Properties configProps = configProps(false);
        configProps.setProperty("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class.getName());
        configProps.setProperty("built.in.metrics.version", str);
        StreamThread createStreamThread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps), false);
        createStreamThread.setState(StreamThread.State.STARTING);
        createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        createStreamThread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = createStreamThread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener().onPartitionsAssigned(singleton);
        createStreamThread.runOnce();
        MetricName metricName = this.metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", createStreamThread.getName()));
        MetricName metricName2 = this.metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", createStreamThread.getName()));
        if ("0.10.0-2.4".equals(str)) {
            Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
            Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName2).metricValue());
        }
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(RecordQueue.class);
        Throwable th = null;
        try {
            try {
                long j = (-1) + 1;
                addRecord(mainConsumer, j);
                addRecord(mainConsumer, j + 1);
                createStreamThread.runOnce();
                if ("0.10.0-2.4".equals(str)) {
                    Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
                    Assert.assertNotEquals(Double.valueOf(0.0d), this.metrics.metric(metricName2).metricValue());
                }
                addRecord(mainConsumer, this + 1);
                addRecord(mainConsumer, this + 1);
                addRecord(mainConsumer, this + 1);
                addRecord(mainConsumer, this + 1);
                createStreamThread.runOnce();
                if ("0.10.0-2.4".equals(str)) {
                    Assert.assertEquals(Double.valueOf(6.0d), this.metrics.metric(metricName).metricValue());
                    Assert.assertNotEquals(Double.valueOf(0.0d), this.metrics.metric(metricName2).metricValue());
                }
                addRecord(mainConsumer, this + 1, 1L);
                addRecord(mainConsumer, this + 1, 1L);
                createStreamThread.runOnce();
                List<String> messages = createAndRegister.getMessages();
                String str2 = "stream-thread [" + Thread.currentThread().getName() + "] task [0_1] ";
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                Assert.assertTrue(messages.contains(str2 + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                if ("0.10.0-2.4".equals(str)) {
                    Assert.assertEquals(Double.valueOf(6.0d), this.metrics.metric(metricName).metricValue());
                    Assert.assertNotEquals(Double.valueOf(0.0d), this.metrics.metric(metricName2).metricValue());
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldTransmitTaskManagerMetrics() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        Map singletonMap = Collections.singletonMap(metricName, new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime()));
        EasyMock.expect(taskManager.producerMetrics()).andReturn(singletonMap);
        EasyMock.replay(new Object[]{taskManager, consumer});
        MatcherAssert.assertThat(singletonMap, Matchers.is(new StreamThread(this.mockTime, new StreamsConfig(configProps(true)), (Admin) null, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE)).producerMetrics()));
    }

    @Test
    public void shouldConstructAdminMetrics() {
        MockAdminClient build = new MockAdminClient.Builder().brokers(Arrays.asList(new Node(0, "dummyHost-1", 1234), new Node(1, "dummyHost-2", 1234))).clusterId((String) null).build();
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        StreamThread streamThread = new StreamThread(this.mockTime, this.config, build, consumer, consumer, (ChangelogReader) null, (String) null, taskManager, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.internalTopologyBuilder, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE));
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric kafkaMetric = new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime());
        EasyMock.replay(new Object[]{taskManager, consumer});
        build.setMockMetrics(metricName, kafkaMetric);
        Assert.assertEquals(metricName, ((Metric) streamThread.adminClientMetrics().get(metricName)).metricName());
    }

    private TaskManager mockTaskManagerCommit(Consumer<byte[], byte[]> consumer, int i, int i2) {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        Task task = (Task) EasyMock.mock(Task.class);
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect(task.state()).andReturn(Task.State.RUNNING).anyTimes();
        EasyMock.expect(task.id()).andReturn(taskId).anyTimes();
        EasyMock.expect(taskManager.tasks()).andReturn(Collections.singletonMap(taskId, task)).times(i);
        EasyMock.expect(Integer.valueOf(taskManager.commit(Collections.singleton(task)))).andReturn(Integer.valueOf(i2)).times(i);
        EasyMock.replay(new Object[]{taskManager, consumer, task});
        return taskManager;
    }

    private void setupInternalTopologyWithoutState() {
        MockProcessor mockProcessor = new MockProcessor();
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", () -> {
            return mockProcessor;
        }, new String[]{"source1"});
    }

    private Collection<Task> createStandbyTask() {
        Logger logger = new LogContext("test").logger(StreamThreadTest.class);
        return new StandbyTaskCreator(this.internalTopologyBuilder, this.config, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest"), this.stateDirectory, new MockChangelogReader(), CLIENT_ID, logger).createTasks(Collections.singletonMap(new TaskId(1, 2), Collections.emptySet()));
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long j) {
        addRecord(mockConsumer, j, -1L);
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long j, long j2) {
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j, j2, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
    }

    StandbyTask standbyTask(TaskManager taskManager, TopicPartition topicPartition) {
        Stream filter = taskManager.tasks().values().stream().filter(task -> {
            return !task.isActive();
        });
        filter.getClass();
        Iterable<StandbyTask> iterable = filter::iterator;
        for (StandbyTask standbyTask : iterable) {
            if (standbyTask.inputPartitions().contains(topicPartition)) {
                return standbyTask;
            }
        }
        return null;
    }
}
