package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest.class */
public class StreamThreadTest {
    private static final String APPLICATION_ID = "stream-thread-test";
    public static final String STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG = "stream-thread-test-count-one-changelog";
    public static final String STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG = "stream-thread-test-table-two-changelog";

    @Mock
    private Consumer<byte[], byte[]> consumer;
    private static final UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d");
    private static final String CLIENT_ID = "stream-thread-test-" + PROCESS_ID;
    private static final BiConsumer<Throwable, Boolean> HANDLER = (th, bool) -> {
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        if (!(th instanceof Error)) {
            throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", th);
        }
        throw ((Error) th);
    };

    @Rule
    public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

    @Parameterized.Parameter(0)
    public boolean stateUpdaterEnabled = true;

    @Parameterized.Parameter(1)
    public boolean processingThreadsEnabled = true;
    private final int threadIdx = 1;
    private final Metrics metrics = new Metrics();
    private final MockTime mockTime = new MockTime();
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
    private final ChangelogReader changelogReader = new MockChangelogReader();
    private StateDirectory stateDirectory = null;
    private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(this.internalTopologyBuilder);
    private StreamThread thread = null;
    private final Consumer<byte[], byte[]> mainConsumer = (Consumer) Mockito.mock(Consumer.class);
    private final String topic1 = AssignmentTestUtils.TP_1_NAME;
    private final String topic2 = AssignmentTestUtils.TP_2_NAME;
    private final TopicPartition t1p1 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 1);
    private final TopicPartition t1p2 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 2);
    private final TopicPartition t2p1 = new TopicPartition(AssignmentTestUtils.TP_2_NAME, 1);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$MockConsumerClientSupplier.class */
    private static class MockConsumerClientSupplier extends MockClientSupplier {
        final Consumer<byte[], byte[]> mockConsumer;
        final Map<String, Object> consumerConfigs = new HashMap();

        MockConsumerClientSupplier(Consumer<byte[], byte[]> consumer) {
            this.mockConsumer = consumer;
        }

        @Override // org.apache.kafka.test.MockClientSupplier
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
            this.consumerConfigs.putAll(map);
            return this.mockConsumer;
        }

        AtomicLong nextRebalanceMs() {
            return ((ReferenceContainer) this.consumerConfigs.get("__reference.container.instance__")).nextScheduledRebalanceMs;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$StateListenerStub.class */
    private static class StateListenerStub implements StreamThread.StateListener {
        int numChanges;
        ThreadStateTransitionValidator oldState;
        ThreadStateTransitionValidator newState;

        private StateListenerStub() {
            this.numChanges = 0;
            this.oldState = null;
            this.newState = null;
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            this.numChanges++;
            if (this.newState != null && !this.newState.equals(threadStateTransitionValidator2)) {
                throw new RuntimeException("State mismatch " + threadStateTransitionValidator2 + " different from " + this.newState);
            }
            this.oldState = threadStateTransitionValidator2;
            this.newState = threadStateTransitionValidator;
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{false, false}, new Object[]{true, false}, new Object[]{true, true});
    }

    @Before
    public void setUp() {
        Thread.currentThread().setName(CLIENT_ID + "-StreamThread-1");
        this.internalTopologyBuilder.setApplicationId(APPLICATION_ID);
    }

    @After
    public void tearDown() {
        if (this.thread != null) {
            this.thread.shutdown();
            this.thread = null;
        }
        this.stateDirectory = null;
    }

    private Properties configProps(boolean z) {
        Map.Entry[] entryArr = new Map.Entry[10];
        entryArr[0] = Utils.mkEntry("application.id", APPLICATION_ID);
        entryArr[1] = Utils.mkEntry("bootstrap.servers", "localhost:2171");
        entryArr[2] = Utils.mkEntry("buffered.records.per.partition", "3");
        entryArr[3] = Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName());
        entryArr[4] = Utils.mkEntry("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        entryArr[5] = Utils.mkEntry("processing.guarantee", z ? "exactly_once_v2" : "at_least_once");
        entryArr[6] = Utils.mkEntry("default.key.serde", Serdes.ByteArraySerde.class.getName());
        entryArr[7] = Utils.mkEntry("default.value.serde", Serdes.ByteArraySerde.class.getName());
        entryArr[8] = Utils.mkEntry("__state.updater.enabled__", Boolean.toString(this.stateUpdaterEnabled));
        entryArr[9] = Utils.mkEntry("__processing.threads.enabled__", Boolean.toString(this.processingThreadsEnabled));
        return Utils.mkProperties(Utils.mkMap(entryArr));
    }

    private Cluster createCluster() {
        Node node = new Node(-1, "localhost", 8121);
        return new Cluster("mockClusterId", Collections.singletonList(node), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), node);
    }

    private StreamThread createStreamThread(String str) {
        return createStreamThread(str, (Time) this.mockTime);
    }

    private StreamThread createStreamThread(String str, Time time) {
        return createStreamThread(str, new StreamsConfig(configProps(false)), time);
    }

    private StreamThread createStreamThread(String str, StreamsConfig streamsConfig) {
        return createStreamThread(str, streamsConfig, this.mockTime);
    }

    private StreamThread createStreamThread(String str, StreamsConfig streamsConfig, Time time) {
        if (!"at_least_once".equals(streamsConfig.getString("processing.guarantee"))) {
            this.clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
        }
        this.clientSupplier.setCluster(createCluster());
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"), time);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockTime, true, false);
        return StreamThread.create(topologyMetadata, streamsConfig, this.clientSupplier, this.clientSupplier.getAdmin(streamsConfig.getAdminConfigs(str)), PROCESS_ID, str, streamsMetricsImpl, time, new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, streamsConfig), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID))), 0L, this.stateDirectory, new MockStateRestoreListener(), new MockStandbyUpdateListener(), 1, (Runnable) null, HANDLER);
    }

    @Test
    public void shouldChangeStateInRebalanceListener() {
        this.thread = createStreamThread(CLIENT_ID);
        StateListenerStub stateListenerStub = new StateListenerStub();
        this.thread.setStateListener(stateListenerStub);
        Assert.assertEquals(this.thread.state(), StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = this.thread.rebalanceListener();
        this.thread.setState(StreamThread.State.STARTING);
        rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertEquals(this.thread.state(), StreamThread.State.PARTITIONS_REVOKED);
        List singletonList = Collections.singletonList(this.t1p1);
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(singletonList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        rebalanceListener.onPartitionsAssigned(singletonList);
        runOnce();
        Assert.assertEquals(this.thread.state(), StreamThread.State.RUNNING);
        Assert.assertEquals(4L, stateListenerStub.numChanges);
        Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListenerStub.oldState);
        this.thread.shutdown();
        Assert.assertSame(StreamThread.State.PENDING_SHUTDOWN, this.thread.state());
    }

    @Test
    public void shouldChangeStateAtStartClose() throws Exception {
        this.thread = createStreamThread(CLIENT_ID, (Time) new MockTime(1L));
        this.thread.setStateListener(new StateListenerStub());
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        this.thread.shutdown();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        this.thread.shutdown();
        Assert.assertEquals(this.thread.state(), StreamThread.State.DEAD);
    }

    @Test
    public void shouldCreateMetricsAtStartup() {
        this.thread = createStreamThread(CLIENT_ID, (Time) new MockTime(1L));
        Map singletonMap = Collections.singletonMap("thread-id", this.thread.getName());
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-ratio", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-ratio", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-ratio", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-ratio", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-total", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-thread-metrics", "", singletonMap)));
        Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-total", "stream-thread-metrics", "", singletonMap)));
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("task-id", "all"), Utils.mkEntry("thread-id", this.thread.getName())});
        Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-task-metrics", "", mkMap)));
        Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-task-metrics", "", mkMap)));
        Assert.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-task-metrics", "", mkMap)));
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        Assert.assertEquals(CLIENT_ID + "-StreamThread-1", this.thread.getName());
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s", "stream-thread-metrics", "thread-id", this.thread.getName())));
        Assert.assertFalse(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", "thread-id", this.thread.getName())));
    }

    @Test
    public void shouldNotCommitBeforeTheCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task task = (Task) Mockito.mock(Task.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(task, 1);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, mockTaskManagerCommit, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(mockTaskManagerCommit)).commit(Utils.mkSet(new Task[]{task}));
    }

    @Test
    public void shouldNotPurgeBeforeThePurgeInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        configProps.setProperty("repartition.purge.interval.ms", Long.toString(2000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager mockTaskManagerPurge = mockTaskManagerPurge();
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, mockTaskManagerPurge, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(1990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(mockTaskManagerPurge)).maybePurgeCommittedRecords();
    }

    @Test
    public void shouldAlsoPurgeWhenNothingGetsCommitted() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        configProps.setProperty("repartition.purge.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId = new TaskId(0, 0);
        StreamTask build = StreamsTestUtils.TaskBuilder.statelessTask(taskId).inState(Task.State.RUNNING).build();
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Mockito.when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, build));
        Mockito.when(Integer.valueOf(taskManager.commit(Collections.singleton(build)))).thenReturn(0);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(consumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.mockTime.sleep(1010L);
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(taskManager)).maybePurgeCommittedRecords();
    }

    @Test
    public void shouldNotProcessWhenPartitionRevoked() {
        Assume.assumeFalse(this.processingThreadsEnabled);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        Mockito.when(this.mainConsumer.poll((Duration) Mockito.any())).thenReturn(ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.mainConsumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        this.thread.runOnceWithoutProcessingThreads();
        ((TaskManager) Mockito.verify(taskManager, Mockito.never())).process(Mockito.anyInt(), (Time) Mockito.any());
    }

    @Test
    public void shouldProcessWhenRunning() {
        Assume.assumeFalse(this.processingThreadsEnabled);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        Mockito.when(this.mainConsumer.poll((Duration) Mockito.any())).thenReturn(ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.mainConsumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.setState(StreamThread.State.RUNNING);
        runOnce();
        ((TaskManager) Mockito.verify(taskManager)).process(Mockito.anyInt(), (Time) Mockito.any());
    }

    @Test
    public void shouldProcessWhenPartitionAssigned() {
        Assume.assumeTrue(this.stateUpdaterEnabled);
        Assume.assumeFalse(this.processingThreadsEnabled);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        Mockito.when(this.mainConsumer.poll((Duration) Mockito.any())).thenReturn(ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.mainConsumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        runOnce();
        ((TaskManager) Mockito.verify(taskManager)).process(Mockito.anyInt(), (Time) Mockito.any());
    }

    @Test
    public void shouldProcessWhenStarting() {
        Assume.assumeTrue(this.stateUpdaterEnabled);
        Assume.assumeFalse(this.processingThreadsEnabled);
        Properties configProps = configProps(false);
        configProps.setProperty("__state.updater.enabled__", Boolean.toString(true));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Mockito.when(this.mainConsumer.poll((Duration) Mockito.any())).thenReturn(ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.mainConsumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        runOnce();
        ((TaskManager) Mockito.verify(taskManager)).process(Mockito.anyInt(), (Time) Mockito.any());
    }

    @Test
    public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws InterruptedException {
        MockTime mockTime = new MockTime(1L);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"), mockTime);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.lenient().when(this.consumer.poll((Duration) ArgumentMatchers.any())).thenReturn(ConsumerRecords.empty());
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        MockConsumerClientSupplier mockConsumerClientSupplier = new MockConsumerClientSupplier(this.consumer);
        mockConsumerClientSupplier.setCluster(createCluster());
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(streamsConfig, mockTime, true, false);
        this.thread = StreamThread.create(topologyMetadata, streamsConfig, mockConsumerClientSupplier, mockConsumerClientSupplier.getAdmin(streamsConfig.getAdminConfigs(CLIENT_ID)), PROCESS_ID, CLIENT_ID, streamsMetricsImpl, mockTime, new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, streamsConfig), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID))), 0L, this.stateDirectory, new MockStateRestoreListener(), new MockStandbyUpdateListener(), 1, (Runnable) null, (BiConsumer) Mockito.mock(BiConsumer.class));
        mockConsumerClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1);
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        this.thread.shutdown();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        ((Consumer) Mockito.verify(this.consumer, Mockito.atMostOnce())).enforceRebalance(ArgumentMatchers.anyString());
    }

    @Test
    public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws InterruptedException {
        MockTime mockTime = new MockTime(1L);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"), mockTime);
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any())).thenReturn(ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        MockConsumerClientSupplier mockConsumerClientSupplier = new MockConsumerClientSupplier(this.consumer);
        mockConsumerClientSupplier.setCluster(createCluster());
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(streamsConfig, mockTime, true, false);
        this.thread = StreamThread.create(topologyMetadata, streamsConfig, mockConsumerClientSupplier, mockConsumerClientSupplier.getAdmin(streamsConfig.getAdminConfigs(CLIENT_ID)), PROCESS_ID, CLIENT_ID, streamsMetricsImpl, mockTime, new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, streamsConfig), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID))), 0L, this.stateDirectory, new MockStateRestoreListener(), new MockStandbyUpdateListener(), 1, (Runnable) null, (BiConsumer) null);
        mockConsumerClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1);
        this.thread.taskManager().handleRebalanceStart(Collections.emptySet());
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        TestUtils.retryOnExceptionWithTimeout(() -> {
        });
        this.thread.shutdown();
        MatcherAssert.assertThat(Long.valueOf(mockConsumerClientSupplier.nextRebalanceMs().get()), CoreMatchers.not(0L));
        this.thread.taskManager().handleRebalanceComplete();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r5v4, types: [long, java.lang.String] */
    @Test
    public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads() {
        Assume.assumeFalse(this.processingThreadsEnabled);
        LinkedList linkedList = new LinkedList();
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalTopologyBuilder.addProcessor("processor1", () -> {
            MockApiProcessor mockApiProcessor = new MockApiProcessor(PunctuationType.WALL_CLOCK_TIME, 10L);
            linkedList.add(mockApiProcessor);
            return mockApiProcessor;
        }, new String[]{"source1"});
        this.internalTopologyBuilder.addProcessor("processor2", () -> {
            return new MockApiProcessor(PunctuationType.STREAM_TIME, 10L);
        }, new String[]{"source1"});
        Properties properties = new Properties();
        properties.put("__state.updater.enabled__", Boolean.valueOf(this.stateUpdaterEnabled));
        properties.put("__processing.threads.enabled__", Boolean.valueOf(this.processingThreadsEnabled));
        properties.put("commit.interval.ms", 100L);
        String name = Serdes.ByteArraySerde.class.getName();
        ?? name2 = Serdes.ByteArraySerde.class.getName();
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID, "localhost:2171", name, name2, properties)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(singleton);
        runOnce();
        addRecord(mainConsumer, (-1) + 1, 0L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, name2 + 1, 1L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(2));
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(2));
        this.mockTime.sleep(11L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(2));
        this.mockTime.sleep(11L);
        addRecord(mainConsumer, this + 1, 5L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 5L);
        addRecord(mainConsumer, this + 1, 6L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(3));
        addRecord(mainConsumer, this + 1, 11L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 12L);
        addRecord(mainConsumer, this + 1, 13L);
        addRecord(mainConsumer, this + 1, 14L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(3));
        linkedList.forEach((v0) -> {
            v0.requestCommit();
        });
        addRecord(mainConsumer, this + 1, 15L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(1));
        addRecord(mainConsumer, this + 1, 15L);
        addRecord(mainConsumer, this + 1, 16L);
        addRecord(mainConsumer, this + 1, 17L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(3));
        this.mockTime.sleep(90L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(3));
        this.mockTime.sleep(90L);
        addRecord(mainConsumer, this + 1, 18L);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.currentNumIterations()), CoreMatchers.equalTo(1));
    }

    @Test
    public void shouldNotCauseExceptionIfNothingCommitted() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task task = (Task) Mockito.mock(Task.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(task, 0);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, mockTaskManagerCommit, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(mockTaskManagerCommit)).commit(Utils.mkSet(new Task[]{task}));
    }

    @Test
    public void shouldCommitAfterCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(100L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        TaskManager taskManager = new TaskManager(null, null, null, null, null, null, new Tasks(new LogContext()), topologyMetadata, null, null, null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.1
            int commit(Collection<Task> collection) {
                atomicBoolean.set(true);
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        };
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assert.assertTrue(atomicBoolean.get());
        this.mockTime.sleep(100L);
        atomicBoolean.set(false);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assert.assertFalse(atomicBoolean.get());
        this.mockTime.sleep(1L);
        atomicBoolean.set(false);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldPurgeAfterPurgeInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(100L));
        configProps.setProperty("repartition.purge.interval.ms", Long.toString(200L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager mockTaskManagerPurge = mockTaskManagerPurge();
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, mockTaskManagerPurge, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(201L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(mockTaskManagerPurge, Mockito.times(2))).maybePurgeCommittedRecords();
    }

    @Test
    public void shouldRecordCommitLatency() {
        DefaultTaskManager defaultTaskManager;
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any())).thenReturn(ConsumerRecords.empty());
        Task task = (Task) Mockito.mock(Task.class);
        ActiveTaskCreator activeTaskCreator = (ActiveTaskCreator) Mockito.mock(ActiveTaskCreator.class);
        Mockito.when(activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) ArgumentMatchers.any())).thenReturn(Collections.singleton(task));
        Mockito.when(activeTaskCreator.producerClientIds()).thenReturn(Collections.singleton("producerClientId"));
        StandbyTaskCreator standbyTaskCreator = (StandbyTaskCreator) Mockito.mock(StandbyTaskCreator.class);
        StateUpdater stateUpdater = (StateUpdater) Mockito.mock(StateUpdater.class);
        if (this.processingThreadsEnabled) {
            defaultTaskManager = (DefaultTaskManager) Mockito.mock(DefaultTaskManager.class);
            new KafkaFutureImpl().complete((Object) null);
        } else {
            defaultTaskManager = null;
        }
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        TaskManager taskManager = new TaskManager(null, this.changelogReader, null, null, activeTaskCreator, standbyTaskCreator, new Tasks(new LogContext()), topologyMetadata, null, null, stateUpdater, defaultTaskManager) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.2
            int commit(Collection<Task> collection) {
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        };
        taskManager.setMainConsumer(this.consumer);
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.updateThreadMetadata("adminClientId");
        this.thread.setState(StreamThread.State.STARTING);
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(this.t1p1));
        Assert.assertTrue(Double.isNaN(((Double) ((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()).doubleValue()));
        Assert.assertTrue(Double.isNaN(((Double) ((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()).doubleValue()));
        runOnce();
        MatcherAssert.assertThat(((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
        MatcherAssert.assertThat(((Metric) streamsMetricsImpl.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), CoreMatchers.equalTo(Double.valueOf(10.0d)));
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mainConsumer.updateBeginningOffsets(hashMap2);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(arrayList));
        Assert.assertEquals(1L, this.clientSupplier.producers.size());
        Producer producer = this.clientSupplier.producers.get(0);
        Iterator it = this.thread.readOnlyActiveTasks().iterator();
        while (it.hasNext()) {
            Assert.assertSame(producer, ((Task) it.next()).recordCollector().producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, this.thread.mainConsumer());
        Assert.assertSame(this.clientSupplier.restoreConsumer, this.thread.restoreConsumer());
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosAlphaEnabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        Properties configProps = configProps(true);
        configProps.put("processing.guarantee", "exactly_once");
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mainConsumer.updateBeginningOffsets(hashMap2);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(arrayList));
        runOnce();
        Assert.assertEquals(this.thread.readOnlyActiveTasks().size(), this.clientSupplier.producers.size());
        Assert.assertSame(this.clientSupplier.consumer, this.thread.mainConsumer());
        Assert.assertSame(this.clientSupplier.restoreConsumer, this.thread.restoreConsumer());
    }

    @Test
    public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mainConsumer.updateBeginningOffsets(hashMap2);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(arrayList));
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.clientSupplier.producers.size()), Matchers.is(1));
        Assert.assertSame(this.clientSupplier.consumer, this.thread.mainConsumer());
        Assert.assertSame(this.clientSupplier.restoreConsumer, this.thread.restoreConsumer());
    }

    @Test
    public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
        Assume.assumeFalse(this.stateUpdaterEnabled);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), new MockTime(1L));
        this.thread.taskManager().handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TP_1_NAME));
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        this.thread.shutdown();
        Assert.assertFalse(this.thread.isRunning());
        Assert.assertTrue(this.thread.isAlive());
        Thread.sleep(1000L);
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task1, this.task2}), this.thread.taskManager().activeTaskIds());
        Assert.assertEquals(StreamThread.State.PENDING_SHUTDOWN, this.thread.state());
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        Assert.assertEquals(Collections.emptySet(), this.thread.taskManager().activeTaskIds());
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), new MockTime(1L));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.STARTING;
        }, 10000L, "Thread never started.");
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        this.thread.shutdown();
        TestUtils.waitForCondition(() -> {
            return this.thread.state() == StreamThread.State.DEAD;
        }, 10000L, "Thread never shut down.");
        Iterator it = this.thread.readOnlyActiveTasks().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Task) it.next()).recordCollector().producer().closed());
        }
    }

    @Test
    public void shouldShutdownTaskManagerOnClose() {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.setStateListener((thread, threadStateTransitionValidator, threadStateTransitionValidator2) -> {
            if (threadStateTransitionValidator2 == StreamThread.State.CREATED && threadStateTransitionValidator == StreamThread.State.STARTING) {
                this.thread.shutdown();
            }
        });
        this.thread.run();
        ((TaskManager) Mockito.verify(taskManager)).shutdown(true);
    }

    @Test
    public void shouldNotReturnDataAfterTaskMigrated() {
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder) Mockito.mock(InternalTopologyBuilder.class);
        Mockito.when(internalTopologyBuilder.fullSourceTopicNames()).thenReturn(Collections.singletonList(AssignmentTestUtils.TP_1_NAME));
        final MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        MockConsumer mockConsumer2 = new MockConsumer(OffsetResetStrategy.EARLIEST);
        mockConsumer.subscribe(Collections.singletonList(AssignmentTestUtils.TP_1_NAME), new MockRebalanceListener());
        mockConsumer.rebalance(Collections.singletonList(this.t1p1));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        mockConsumer.seekToEnd(Collections.singletonList(this.t1p1));
        final TaskMigratedException taskMigratedException = new TaskMigratedException("Changelog restore found task migrated", new RuntimeException("restore task migrated"));
        ChangelogReader changelogReader = this.changelogReader;
        if (this.stateUpdaterEnabled) {
            Mockito.when(Boolean.valueOf(taskManager.checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer) ArgumentMatchers.any()))).thenAnswer(invocationOnMock -> {
                mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 11L, new byte[0], new byte[0]));
                mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 12L, new byte[1], new byte[0]));
                throw taskMigratedException;
            });
        } else {
            changelogReader = new MockChangelogReader() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.3
                @Override // org.apache.kafka.streams.processor.internals.MockChangelogReader
                public long restore(Map<TaskId, Task> map) {
                    mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 11L, new byte[0], new byte[0]));
                    mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 12L, new byte[1], new byte[0]));
                    throw taskMigratedException;
                }
            };
        }
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        this.thread = new StreamThread(new MockTime(1L), streamsConfig, (Admin) null, mockConsumer, mockConsumer2, changelogReader, (String) null, taskManager, (StateUpdater) null, streamsMetricsImpl, new TopologyMetadata(internalTopologyBuilder, streamsConfig), CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), (Runnable) null, HANDLER, (java.util.function.Consumer) null).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        StreamThread streamThread = this.thread;
        streamThread.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, streamThread::run);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.isA(IllegalStateException.class));
        Assert.assertEquals("No current assignment for partition topic1-1", assertThrows.getCause().getMessage());
        Assert.assertFalse(mockConsumer.shouldRebalance());
        ((TaskManager) Mockito.verify(taskManager)).handleLostAll();
    }

    @Test
    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.shutdown();
        ((TaskManager) Mockito.verify(taskManager)).shutdown(true);
    }

    @Test
    public void shouldOnlyShutdownOnce() {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.shutdown();
        this.thread.run();
        ((TaskManager) Mockito.verify(taskManager)).shutdown(true);
    }

    @Test
    public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TOPIC_PREFIX});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(Collections.emptyMap(), hashMap);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing() throws Exception {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        StreamsConfig streamsConfig = new StreamsConfig(configProps(true));
        this.thread = createStreamThread(CLIENT_ID, streamsConfig);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.consumer;
        mockConsumer.updatePartitions(AssignmentTestUtils.TP_1_NAME, Collections.singletonList(new PartitionInfo(AssignmentTestUtils.TP_1_NAME, 1, (Node) null, (Node[]) null, (Node[]) null)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.readOnlyActiveTasks().size()), CoreMatchers.equalTo(1));
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(arrayList.iterator().next(), 0L));
        mockConsumer.unsubscribe();
        mockConsumer.assign(new HashSet(arrayList));
        mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 0L, new byte[0], new byte[0]));
        if (this.processingThreadsEnabled) {
            Assert.assertTrue(runUntilTimeoutOrCondition(this::runOnce, () -> {
                return !mockProducer.history().isEmpty();
            }));
        } else {
            this.mockTime.sleep(streamsConfig.getLong("commit.interval.ms").longValue() + 1);
            runOnce();
            MatcherAssert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.equalTo(1));
        }
        this.mockTime.sleep(streamsConfig.getLong("commit.interval.ms").longValue() + 1);
        TestUtils.waitForCondition(() -> {
            return mockProducer.commitCount() == 1;
        }, "StreamsThread did not commit transaction.");
        mockProducer.fenceProducer();
        this.mockTime.sleep(streamsConfig.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 1L, new byte[0], new byte[0]));
        try {
            if (this.processingThreadsEnabled) {
                runUntilTimeoutOrException(this::runOnce);
            } else {
                runOnce();
            }
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (KafkaException e) {
            Assert.assertTrue(String.format("Expected TaskMigratedException but got %s", e), e instanceof TaskMigratedException);
            Assert.assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", this.thread.readOnlyActiveTasks().stream().anyMatch(task -> {
                return task.id().equals(this.task1);
            }));
        }
        MatcherAssert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception {
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)));
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) this.thread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.readOnlyActiveTasks().size()), CoreMatchers.equalTo(1));
        addRecord(mockConsumer, 0L);
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        runOnce();
        if (this.processingThreadsEnabled) {
            TestUtils.waitForCondition(() -> {
                return !mockProducer.uncommittedRecords().isEmpty();
            }, "Processing threads to process record");
        }
        mockProducer.commitTransactionException = new ProducerFencedException("Producer is fenced");
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.thread.rebalanceListener().onPartitionsRevoked(arrayList);
        });
        Assert.assertFalse(mockProducer.transactionCommitted());
        Assert.assertFalse(mockProducer.closed());
        Assert.assertEquals(1L, this.thread.readOnlyActiveTasks().size());
    }

    @Test
    public void shouldReinitializeRevivedTasksInAnyState() throws Exception {
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), new MockTime(1L));
        TopicPartition topicPartition = new TopicPartition("stream-thread-test-store-changelog", 1);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        this.internalTopologyBuilder.addProcessor("proc", () -> {
            return record -> {
                if (atomicBoolean.get()) {
                    throw new TaskCorruptedException(Collections.singleton(this.task1));
                }
                atomicBoolean2.set(true);
            };
        }, new String[]{"name"});
        this.internalTopologyBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.String(), Serdes.String()), new String[]{"proc"});
        this.internalTopologyBuilder.buildTopology();
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) this.thread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p1, 0L)}));
        this.thread.restoreConsumer().updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, 0L)}));
        this.thread.adminClient().updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.readOnlyActiveTasks().size()), CoreMatchers.equalTo(1));
        runOnce();
        if (this.stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> {
                return this.thread.taskManager().checkStateUpdater(this.mockTime.milliseconds(), set -> {
                    mockConsumer.seekToBeginning(Collections.singleton(this.t1p1));
                });
            }, 10000L, "State updater never returned tasks.");
        }
        addRecord(mockConsumer, 0L);
        atomicBoolean.set(true);
        this.thread.taskManager().handleCorruption((this.processingThreadsEnabled ? (TaskCorruptedException) Assert.assertThrows(TaskCorruptedException.class, () -> {
            runUntilTimeoutOrException(this::runOnce);
        }) : Assert.assertThrows(TaskCorruptedException.class, this::runOnce)).corruptedTasks());
        if (this.stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> {
                return this.thread.taskManager().checkStateUpdater(this.mockTime.milliseconds(), set -> {
                    mockConsumer.seekToBeginning(Collections.singleton(this.t1p1));
                });
            }, 10000L, "State updater never returned tasks.");
        }
        runOnce();
        runOnce();
        addRecord(mockConsumer, 0L);
        atomicBoolean.set(false);
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(false));
        if (this.processingThreadsEnabled) {
            Runnable runnable = this::runOnce;
            atomicBoolean2.getClass();
            Assert.assertTrue(runUntilTimeoutOrCondition(runnable, atomicBoolean2::get));
        } else {
            runOnce();
            runOnce();
            MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(true));
        }
    }

    @Test
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        StreamsConfig streamsConfig = new StreamsConfig(configProps(true));
        this.thread = createStreamThread(CLIENT_ID, streamsConfig);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.consumer;
        mockConsumer.updatePartitions(AssignmentTestUtils.TP_1_NAME, Collections.singletonList(new PartitionInfo(AssignmentTestUtils.TP_1_NAME, 1, (Node) null, (Node[]) null, (Node[]) null)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.readOnlyActiveTasks().size()), CoreMatchers.equalTo(1));
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        mockProducer.commitTransactionException = new ProducerFencedException("Producer is fenced");
        this.mockTime.sleep(streamsConfig.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 1L, new byte[0], new byte[0]));
        try {
            if (this.processingThreadsEnabled) {
                runUntilTimeoutOrException(this::runOnce);
            } else {
                runOnce();
            }
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (KafkaException e) {
            Assert.assertTrue(e instanceof TaskMigratedException);
            Assert.assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", this.thread.readOnlyActiveTasks().stream().anyMatch(task -> {
                return task.id().equals(this.task1);
            }));
        }
        MatcherAssert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(0L));
        Assert.assertTrue(this.clientSupplier.producers.get(0).transactionInFlight());
        Assert.assertFalse(this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertFalse(this.clientSupplier.producers.get(0).closed());
        Assert.assertEquals(1L, this.thread.readOnlyActiveTasks().size());
    }

    @Test
    public void shouldNotCloseTaskProducerWhenSuspending() throws Exception {
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)));
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"name"});
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer) this.thread.mainConsumer();
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        MatcherAssert.assertThat(Integer.valueOf(this.thread.readOnlyActiveTasks().size()), CoreMatchers.equalTo(1));
        addRecord(mockConsumer, 0L);
        MockProducer<byte[], byte[]> mockProducer = this.clientSupplier.producers.get(0);
        if (this.processingThreadsEnabled) {
            Assert.assertTrue(runUntilTimeoutOrCondition(this::runOnce, () -> {
                return !mockProducer.history().isEmpty();
            }));
        } else {
            runOnce();
        }
        this.thread.rebalanceListener().onPartitionsRevoked(arrayList);
        Assert.assertTrue(mockProducer.transactionCommitted());
        Assert.assertTrue(mockProducer.transactionCommitted());
        Assert.assertFalse(mockProducer.closed());
        Assert.assertEquals(1L, this.thread.readOnlyActiveTasks().size());
    }

    @Test
    public void shouldReturnActiveTaskMetadataWhileRunningState() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.clientSupplier.setCluster(createCluster());
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, streamsConfig.getString("built.in.metrics.version"), this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockTime, true, false);
        this.thread = StreamThread.create(topologyMetadata, streamsConfig, this.clientSupplier, this.clientSupplier.getAdmin(streamsConfig.getAdminConfigs(CLIENT_ID)), PROCESS_ID, CLIENT_ID, streamsMetricsImpl, this.mockTime, new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, streamsConfig), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID))), 0L, this.stateDirectory, new MockStateRestoreListener(), new MockStandbyUpdateListener(), 1, (Runnable) null, HANDLER);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(arrayList);
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList);
        runOnce();
        ThreadMetadata threadMetadata = this.thread.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.activeTasks().contains(new TaskMetadataImpl(this.task1, Utils.mkSet(new TopicPartition[]{this.t1p1}), new HashMap(), new HashMap(), Optional.empty())));
        Assert.assertTrue(threadMetadata.standbyTasks().isEmpty());
        Assert.assertTrue("#threadState() was: " + threadMetadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(threadMetadata.threadState()));
        String threadName = threadMetadata.threadName();
        MatcherAssert.assertThat(threadName, CoreMatchers.startsWith(CLIENT_ID + "-StreamThread-1"));
        Assert.assertEquals(threadName + "-consumer", threadMetadata.consumerClientId());
        Assert.assertEquals(threadName + "-restore-consumer", threadMetadata.restoreConsumerClientId());
        Assert.assertEquals(Collections.singleton(threadName + "-producer"), threadMetadata.producerClientIds());
        Assert.assertEquals(CLIENT_ID + "-admin", threadMetadata.adminClientId());
    }

    @Test
    public void shouldReturnStandbyTaskMetadataWhileRunningState() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        this.internalStreamsBuilder.stream(Collections.singleton(AssignmentTestUtils.TP_1_NAME), this.consumed).groupByKey().count(Materialized.as("count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = createStreamThread(CLIENT_ID, streamsConfig, new MockTime(1L));
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, Collections.singletonList(new PartitionInfo(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 0, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(Collections.emptyMap(), hashMap2);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
        runOnce();
        ThreadMetadata threadMetadata = this.thread.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadataImpl(this.task1, Utils.mkSet(new TopicPartition[]{this.t1p1}), new HashMap(), new HashMap(), Optional.empty())));
        Assert.assertTrue(threadMetadata.activeTasks().isEmpty());
    }

    @Test
    public void shouldUpdateStandbyTask() throws Exception {
        Assume.assumeFalse(this.stateUpdaterEnabled);
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        setupThread("count-one", "table-two", STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, mockConsumer, false);
        runOnce();
        StandbyTask standbyTask = standbyTask(this.thread.taskManager(), this.t1p1);
        StandbyTask standbyTask2 = standbyTask(this.thread.taskManager(), this.t2p1);
        Assert.assertEquals(this.task1, standbyTask.id());
        Assert.assertEquals(this.task3, standbyTask2.id());
        KeyValueStore store = standbyTask.getStore("count-one");
        KeyValueStore store2 = standbyTask2.getStore("table-two");
        Assert.assertEquals(0L, store.approximateNumEntries());
        Assert.assertEquals(0L, store2.approximateNumEntries());
        addStandbyRecordsToRestoreConsumer(mockConsumer);
        runOnce();
        Assert.assertEquals(10L, store.approximateNumEntries());
        Assert.assertEquals(4L, store2.approximateNumEntries());
    }

    private void addActiveRecordsToRestoreConsumer(MockConsumer<byte[], byte[]> mockConsumer) {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            mockConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 2, j2, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
            j = j2 + 1;
        }
    }

    private void addStandbyRecordsToRestoreConsumer(MockConsumer<byte[], byte[]> mockConsumer) {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            mockConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1, j2, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
            mockConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, 1, j2, ("K" + j2).getBytes(), ("V" + j2).getBytes()));
            j = j2 + 1;
        }
    }

    private void setupThread(String str, String str2, String str3, String str4, MockConsumer<byte[], byte[]> mockConsumer, boolean z) throws IOException {
        TopicPartition topicPartition = new TopicPartition(str3, 2);
        TopicPartition topicPartition2 = new TopicPartition(str3, 1);
        TopicPartition topicPartition3 = new TopicPartition(str4, 1);
        this.internalStreamsBuilder.stream(Collections.singleton(AssignmentTestUtils.TP_1_NAME), this.consumed).groupByKey().count(Materialized.as(str));
        this.internalStreamsBuilder.table(AssignmentTestUtils.TP_2_NAME, new ConsumedInternal(), new MaterializedInternal(Materialized.as(str2), this.internalStreamsBuilder, ""));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        mockConsumer.updatePartitions(str3, Collections.singletonList(new PartitionInfo(str3, 1, (Node) null, new Node[0], new Node[0])));
        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.thread.adminClient().updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.thread.adminClient().updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition2, 10L));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition3, 10L));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition3, 0L));
        new OffsetCheckpoint(new File(this.stateDirectory.getOrCreateDirectoryForTask(this.task3), ".checkpoint")).write(Collections.singletonMap(topicPartition3, 5L));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (z) {
            hashMap.put(this.task2, Collections.singleton(this.t1p2));
        }
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        hashMap2.put(this.task3, Collections.singleton(this.t2p1));
        this.thread.taskManager().handleAssignment(hashMap, hashMap2);
        this.thread.taskManager().tryToCompleteRestoration(this.mockTime.milliseconds(), (java.util.function.Consumer) null);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception {
        Assume.assumeFalse(this.stateUpdaterEnabled);
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        setupThread("count-one", "table-two", STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, mockConsumer, true);
        runOnce();
        StreamTask activeTask = activeTask(this.thread.taskManager(), this.t1p2);
        StandbyTask standbyTask = standbyTask(this.thread.taskManager(), this.t1p1);
        StandbyTask standbyTask2 = standbyTask(this.thread.taskManager(), this.t2p1);
        Assert.assertEquals(this.task1, standbyTask.id());
        Assert.assertEquals(this.task3, standbyTask2.id());
        KeyValueStore store = activeTask.getStore("count-one");
        KeyValueStore store2 = standbyTask.getStore("count-one");
        KeyValueStore store3 = standbyTask2.getStore("table-two");
        Assert.assertEquals(0L, store.approximateNumEntries());
        Assert.assertEquals(0L, store2.approximateNumEntries());
        Assert.assertEquals(0L, store3.approximateNumEntries());
        addActiveRecordsToRestoreConsumer(mockConsumer);
        addStandbyRecordsToRestoreConsumer(mockConsumer);
        this.thread.taskManager().topologyMetadata().pauseTopology("__UNNAMED_TOPOLOGY__");
        runOnce();
        Assert.assertEquals(0L, store.approximateNumEntries());
        Assert.assertEquals(0L, store2.approximateNumEntries());
        Assert.assertEquals(0L, store3.approximateNumEntries());
        this.thread.taskManager().topologyMetadata().resumeTopology("__UNNAMED_TOPOLOGY__");
        runOnce();
        Assert.assertEquals(10L, store.approximateNumEntries());
        Assert.assertEquals(0L, store2.approximateNumEntries());
        Assert.assertEquals(0L, store3.approximateNumEntries());
        runOnce();
        Assert.assertEquals(10L, store.approximateNumEntries());
        Assert.assertEquals(10L, store2.approximateNumEntries());
        Assert.assertEquals(4L, store3.approximateNumEntries());
    }

    @Test
    public void shouldCreateStandbyTask() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        setupInternalTopologyWithoutState(streamsConfig);
        this.internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), new String[]{"processor1"});
        MatcherAssert.assertThat(createStandbyTask(streamsConfig), CoreMatchers.not(Matchers.empty()));
    }

    @Test
    public void shouldNotCreateStandbyTaskWithoutStateStores() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        setupInternalTopologyWithoutState(streamsConfig);
        MatcherAssert.assertThat(createStandbyTask(streamsConfig), Matchers.empty());
    }

    @Test
    public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        setupInternalTopologyWithoutState(streamsConfig);
        MockKeyValueStoreBuilder mockKeyValueStoreBuilder = new MockKeyValueStoreBuilder("myStore", true);
        mockKeyValueStoreBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addStateStore(mockKeyValueStoreBuilder, new String[]{"processor1"});
        MatcherAssert.assertThat(createStandbyTask(streamsConfig), Matchers.empty());
    }

    @Test
    public void shouldPunctuateActiveTask() {
        Assume.assumeFalse(this.processingThreadsEnabled);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.internalStreamsBuilder.stream(Collections.singleton(AssignmentTestUtils.TP_1_NAME), this.consumed).process(() -> {
            return new ContextualProcessor<Object, Object, Void, Void>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.4
                public void init(ProcessorContext<Void, Void> processorContext) {
                    Duration ofMillis = Duration.ofMillis(100L);
                    PunctuationType punctuationType = PunctuationType.STREAM_TIME;
                    List list = arrayList;
                    list.getClass();
                    processorContext.schedule(ofMillis, punctuationType, (v1) -> {
                        r3.add(v1);
                    });
                    Duration ofMillis2 = Duration.ofMillis(100L);
                    PunctuationType punctuationType2 = PunctuationType.WALL_CLOCK_TIME;
                    List list2 = arrayList2;
                    list2.getClass();
                    processorContext.schedule(ofMillis2, punctuationType2, (v1) -> {
                        r3.add(v1);
                    });
                }

                public void process(Record<Object, Object> record) {
                }
            };
        }, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        ArrayList arrayList3 = new ArrayList();
        HashMap hashMap = new HashMap();
        arrayList3.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.clientSupplier.consumer.assign(arrayList3);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList3);
        runOnce();
        Assert.assertEquals(0L, arrayList.size());
        Assert.assertEquals(0L, arrayList2.size());
        this.mockTime.sleep(100L);
        this.clientSupplier.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 100L, 100L, TimestampType.CREATE_TIME, "K".getBytes().length, "V".getBytes().length, "K".getBytes(), "V".getBytes(), new RecordHeaders(), Optional.empty()));
        runOnce();
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(1L, arrayList2.size());
        this.mockTime.sleep(100L);
        runOnce();
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(2L, arrayList2.size());
    }

    @Test
    public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
        Assume.assumeFalse(this.processingThreadsEnabled);
        TransformerSupplier transformerSupplier = () -> {
            return new Transformer<Object, Object, KeyValue<Object, Object>>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.5
                public void init(org.apache.kafka.streams.processor.ProcessorContext processorContext) {
                    processorContext.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, j -> {
                        processorContext.forward("key", "value");
                    });
                    processorContext.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, j2 -> {
                        processorContext.forward("key", "value");
                    });
                }

                /* renamed from: transform, reason: merged with bridge method [inline-methods] */
                public KeyValue<Object, Object> m147transform(Object obj, Object obj2) {
                    return null;
                }

                public void close() {
                }
            };
        };
        ArrayList arrayList = new ArrayList();
        this.internalStreamsBuilder.stream(Collections.singleton(AssignmentTestUtils.TP_1_NAME), this.consumed).transform(transformerSupplier, new String[0]).process(() -> {
            return record -> {
                arrayList.add(Long.valueOf(record.timestamp()));
            };
        }, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        long milliseconds = this.mockTime.milliseconds();
        this.thread = createStreamThread(CLIENT_ID);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        arrayList2.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        this.clientSupplier.consumer.assign(arrayList2);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(arrayList2);
        runOnce();
        Assert.assertEquals(0L, arrayList.size());
        this.mockTime.sleep(100L);
        runOnce();
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(milliseconds + 100, ((Long) arrayList.get(0)).longValue());
        this.clientSupplier.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TP_1_NAME, 1, 110L, 110L, TimestampType.CREATE_TIME, "K".getBytes().length, "V".getBytes().length, "K".getBytes(), "V".getBytes(), new RecordHeaders(), Optional.empty()));
        runOnce();
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(110L, ((Long) arrayList.get(1)).longValue());
    }

    @Test
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)));
        Assert.assertEquals(StreamThread.State.CREATED.name(), this.thread.threadMetadata().threadState());
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.setState(StreamThread.State.RUNNING);
        Assert.assertEquals(StreamThread.State.RUNNING.name(), this.thread.threadMetadata().threadState());
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
        this.internalStreamsBuilder.stream(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX), this.consumed).groupByKey().count(Materialized.as("count"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = createStreamThread("clientId", (Time) new MockTime(1L));
        MockConsumer mainConsumer = this.thread.mainConsumer();
        MockConsumer restoreConsumer = this.thread.restoreConsumer();
        MockAdminClient adminClient = this.thread.adminClient();
        TopicPartition topicPartition = new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0);
        Set singleton = Collections.singleton(topicPartition);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), singleton);
        this.thread.taskManager().handleAssignment(hashMap, Collections.emptyMap());
        mainConsumer.updatePartitions(AssignmentTestUtils.TOPIC_PREFIX, Collections.singletonList(new PartitionInfo(AssignmentTestUtils.TOPIC_PREFIX, 0, (Node) null, new Node[0], new Node[0])));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        mainConsumer.subscribe(Utils.mkSet(new String[]{topicPartition.topic()}));
        mainConsumer.rebalance(Collections.singleton(topicPartition));
        restoreConsumer.updatePartitions("stream-thread-test-count-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-changelog", 0, (Node) null, new Node[0], new Node[0])));
        TopicPartition topicPartition2 = new TopicPartition("stream-thread-test-count-changelog", 0);
        final Set singleton2 = Collections.singleton(topicPartition2);
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        adminClient.updateEndOffsets(Collections.singletonMap(topicPartition2, 2L));
        mainConsumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsAssigned(singleton);
        });
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return restoreConsumer.assignment().size() == 1;
        }, "Never get the assignment");
        restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition(() -> {
            return restoreConsumer.position(topicPartition2) == 1;
        }, "Never restore first record");
        restoreConsumer.setPollException(new InvalidOffsetException("Try Again!") { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.6
            public Set<TopicPartition> partitions() {
                return singleton2;
            }
        });
        TestUtils.waitForCondition(() -> {
            return restoreConsumer.assignment().size() == 1;
        }, "Never get the assignment");
        TestUtils.waitForCondition(() -> {
            return restoreConsumer.position(topicPartition2) == 0;
        }, "Never restore first record");
        restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
        restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), "V2".getBytes()));
        if (this.stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> {
                return restoreConsumer.assignment().size() == 0;
            }, "Never get the assignment");
        } else {
            TestUtils.waitForCondition(() -> {
                restoreConsumer.assign(singleton2);
                return restoreConsumer.position(topicPartition2) == 2;
            }, "Never finished restore");
        }
    }

    /* JADX WARN: Type inference failed for: r3v7, types: [long, java.lang.String] */
    @Test
    public void shouldLogAndRecordSkippedMetricForDeserializationException() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        Properties configProps = configProps(false);
        configProps.setProperty("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        configProps.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(singleton);
        runOnce();
        ?? r3 = this.t1p1.topic();
        mainConsumer.addRecord(new ConsumerRecord((String) r3, this.t1p1.partition(), (-1) + 1, -1L, TimestampType.CREATE_TIME, -1, -1, new byte[0], "I am not an integer.".getBytes(), new RecordHeaders(), Optional.empty()));
        mainConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), r3 + 1, -1L, TimestampType.CREATE_TIME, -1, -1, new byte[0], "I am not an integer.".getBytes(), new RecordHeaders(), Optional.empty()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(RecordDeserializer.class);
        Throwable th = null;
        try {
            try {
                runOnce();
                List messages = createAndRegister.getMessages();
                Assert.assertTrue(messages.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
                Assert.assertTrue(messages.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowTaskMigratedExceptionHandlingTaskLost() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        Set singleton = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        mockConsumer.assign(singleton);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        ((TaskManager) Mockito.doThrow(new Throwable[]{new TaskMigratedException("Task lost exception", new RuntimeException())}).when(taskManager)).handleLostAll();
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(mockConsumer, taskManager, streamsConfig, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        mockConsumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsLost(singleton);
        });
        this.thread.setState(StreamThread.State.STARTING);
        Assert.assertThrows(TaskMigratedException.class, this::runOnce);
    }

    @Test
    public void shouldThrowTaskMigratedExceptionHandlingRevocation() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        Set singleton = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST);
        mockConsumer.assign(singleton);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        mockConsumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        ((TaskManager) Mockito.doThrow(new Throwable[]{new TaskMigratedException("Revocation non fatal exception", new RuntimeException())}).when(taskManager)).handleRevocation(singleton);
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(mockConsumer, taskManager, streamsConfig, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        mockConsumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsRevoked(singleton);
        });
        this.thread.setState(StreamThread.State.STARTING);
        Assert.assertThrows(TaskMigratedException.class, this::runOnce);
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$7] */
    @Test
    public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final Set singleton = Collections.singleton(new TaskId(0, 0));
        Mockito.when(Boolean.valueOf(taskManager.handleCorruption(singleton))).thenReturn(true);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.7
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.run();
        ((Consumer) Mockito.verify(consumer)).subscribe((Collection) ArgumentMatchers.any(), (ConsumerRebalanceListener) ArgumentMatchers.any());
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$8] */
    @Test
    public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final Set singleton = Collections.singleton(new TaskId(0, 0));
        ((TaskManager) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(taskManager)).handleCorruption(singleton);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.8
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.thread.setStreamsUncaughtExceptionHandler((th, bool) -> {
            atomicBoolean.set(true);
        });
        this.thread.run();
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        ((Consumer) Mockito.verify(consumer)).subscribe((Collection) ArgumentMatchers.any(), (ConsumerRebalanceListener) ArgumentMatchers.any());
        ((Consumer) Mockito.verify(consumer)).unsubscribe();
    }

    /* JADX WARN: Type inference failed for: r1v10, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$9] */
    @Test
    public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final Set singleton = Collections.singleton(new TaskId(0, 0));
        ((TaskManager) Mockito.doThrow(new Throwable[]{new TaskMigratedException("Task migrated", new RuntimeException("non-corrupted task migrated"))}).when(taskManager)).handleCorruption(singleton);
        ((TaskManager) Mockito.doNothing().when(taskManager)).handleLostAll();
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.9
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).subscribe((Collection) ArgumentMatchers.any(), (ConsumerRebalanceListener) ArgumentMatchers.any());
        ((Consumer) Mockito.verify(consumer)).unsubscribe();
    }

    /* JADX WARN: Type inference failed for: r1v11, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$10] */
    @Test
    public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(true));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final Set singleton = Collections.singleton(new TaskId(0, 0));
        Mockito.when(Boolean.valueOf(taskManager.handleCorruption(singleton))).thenReturn(true);
        ((Consumer) Mockito.doNothing().when(consumer)).enforceRebalance("Active tasks corrupted");
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.10
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer) Mockito.verify(consumer)).subscribe((Collection) ArgumentMatchers.any(), (ConsumerRebalanceListener) ArgumentMatchers.any());
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [org.apache.kafka.streams.processor.internals.StreamThreadTest$11] */
    @Test
    public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(true));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final Set singleton = Collections.singleton(new TaskId(0, 0));
        Mockito.when(Boolean.valueOf(taskManager.handleCorruption(singleton))).thenReturn(false);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.11
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(singleton);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId(CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer) Mockito.verify(consumer)).subscribe((Collection) ArgumentMatchers.any(), (ConsumerRebalanceListener) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotCommitNonRunningNonRestoringTasks() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task task = (Task) Mockito.mock(Task.class);
        Task task2 = (Task) Mockito.mock(Task.class);
        Task task3 = (Task) Mockito.mock(Task.class);
        TaskId taskId = new TaskId(0, 1);
        TaskId taskId2 = new TaskId(0, 2);
        TaskId taskId3 = new TaskId(0, 3);
        Mockito.when(task.state()).thenReturn(Task.State.RUNNING);
        Mockito.when(task2.state()).thenReturn(Task.State.RESTORING);
        Mockito.when(task3.state()).thenReturn(Task.State.CREATED);
        Mockito.when(taskManager.allOwnedTasks()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(taskId, task), Utils.mkEntry(taskId2, task2), Utils.mkEntry(taskId3, task3)}));
        Mockito.when(Integer.valueOf(taskManager.commit(Utils.mkSet(new Task[]{task, task2})))).thenReturn(2);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager) Mockito.verify(taskManager)).commit(Utils.mkSet(new Task[]{task, task2}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        Properties configProps = configProps(false);
        configProps.setProperty("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class.getName());
        this.thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId taskId = new TaskId(0, this.t1p1.partition());
        Set singleton = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(taskId, singleton), Collections.emptyMap());
        MockConsumer mainConsumer = this.thread.mainConsumer();
        mainConsumer.assign(Collections.singleton(this.t1p1));
        mainConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(singleton);
        runOnce();
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(RecordQueue.class);
        Throwable th = null;
        try {
            long j = (-1) + 1;
            addRecord(mainConsumer, j);
            addRecord(mainConsumer, j + 1);
            runOnce();
            addRecord(mainConsumer, this + 1);
            addRecord(mainConsumer, this + 1);
            addRecord(mainConsumer, this + 1);
            addRecord(mainConsumer, this + 1);
            runOnce();
            addRecord(mainConsumer, this + 1, 1L);
            addRecord(mainConsumer, this + 1, 1L);
            runOnce();
            List messages = createAndRegister.getMessages();
            String str = "stream-thread [" + Thread.currentThread().getName() + "] task [0_1] ";
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assert.assertTrue(messages.contains(str + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldTransmitTaskManagerMetrics() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        Map singletonMap = Collections.singletonMap(metricName, new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime()));
        Mockito.when(taskManager.producerMetrics()).thenReturn(singletonMap);
        new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = buildStreamThread(this.consumer, taskManager, streamsConfig, topologyMetadata);
        MatcherAssert.assertThat(singletonMap, Matchers.is(this.thread.producerMetrics()));
    }

    @Test
    public void shouldConstructAdminMetrics() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        MockAdminClient build = new MockAdminClient.Builder().brokers(Arrays.asList(new Node(0, "dummyHost-1", 1234), new Node(1, "dummyHost-2", 1234))).clusterId((String) null).build();
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, build, this.consumer, this.consumer, this.changelogReader, (String) null, taskManager, (StateUpdater) null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), (Runnable) null, HANDLER, (java.util.function.Consumer) null);
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        build.setMockMetrics(metricName, new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime()));
        Assert.assertEquals(metricName, ((Metric) this.thread.adminClientMetrics().get(metricName)).metricName());
    }

    @Test
    public void shouldNotRecordFailedStreamThread() {
        runAndVerifyFailedStreamThreadRecording(false);
    }

    @Test
    public void shouldRecordFailedStreamThread() {
        runAndVerifyFailedStreamThreadRecording(true);
    }

    public void runAndVerifyFailedStreamThreadRecording(final boolean z) {
        StreamsConfig streamsConfig = new StreamsConfig(configProps(false));
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(this.consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread(this.mockTime, streamsConfig, null, this.consumer, this.consumer, this.changelogReader, null, taskManager, null, streamsMetricsImpl, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, (th, bool) -> {
        }, null) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.12
            void runOnceWithProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                if (z) {
                    throw new StreamsException(Thread.currentThread().getName());
                }
            }

            void runOnceWithoutProcessingThreads() {
                setState(StreamThread.State.PENDING_SHUTDOWN);
                if (z) {
                    throw new StreamsException(Thread.currentThread().getName());
                }
            }
        };
        this.thread.updateThreadMetadata("metadata");
        this.thread.run();
        MatcherAssert.assertThat(StreamsTestUtils.getMetricByName(this.metrics.metrics(), "failed-stream-threads", "stream-metrics").metricValue(), Matchers.is(Double.valueOf(z ? 1.0d : 0.0d)));
    }

    @Test
    public void shouldCheckStateUpdater() {
        Assume.assumeTrue(this.stateUpdaterEnabled);
        this.thread = setUpThread(configProps(false));
        TaskManager taskManager = this.thread.taskManager();
        this.thread.setState(StreamThread.State.STARTING);
        runOnce();
        ((TaskManager) Mockito.verify(taskManager)).checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer) Mockito.any());
        if (this.processingThreadsEnabled) {
            return;
        }
        ((TaskManager) Mockito.verify(taskManager)).process(Mockito.anyInt(), (Time) Mockito.any());
    }

    @Test
    public void shouldCheckStateUpdaterInBetweenProcessCalls() {
        Assume.assumeTrue(this.stateUpdaterEnabled);
        Assume.assumeFalse(this.processingThreadsEnabled);
        this.thread = setUpThread(configProps(false));
        TaskManager taskManager = this.thread.taskManager();
        this.thread.setState(StreamThread.State.STARTING);
        Mockito.when(Integer.valueOf(taskManager.process(Mockito.anyInt(), (Time) Mockito.any()))).thenReturn(1).thenReturn(0);
        runOnce();
        ((TaskManager) Mockito.verify(taskManager, Mockito.times(2))).checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer) Mockito.any());
    }

    @Test
    public void shouldUpdateLagsAfterPolling() {
        this.thread = setUpThread(configProps(false));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.updateThreadMetadata("metadata");
        this.thread.setState(StreamThread.State.RUNNING);
        runOnce();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.mainConsumer, this.thread.taskManager()});
        ((Consumer) inOrder.verify(this.mainConsumer)).poll((Duration) Mockito.any());
        ((TaskManager) inOrder.verify(this.thread.taskManager())).updateLags();
    }

    @Test
    public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling() {
        this.thread = setUpThread(configProps(false));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        runOnce();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.thread.taskManager(), this.mainConsumer});
        ((TaskManager) inOrder.verify(this.thread.taskManager())).resumePollingForPartitionsWithAvailableSpace();
        ((Consumer) inOrder.verify(this.mainConsumer)).poll((Duration) Mockito.any());
    }

    @Test
    public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
        Assume.assumeTrue(this.stateUpdaterEnabled);
        Properties configProps = configProps(false);
        Duration ofMillis = Duration.ofMillis(new StreamsConfig(configProps).getLong("poll.ms").longValue());
        this.thread = setUpThread(configProps);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        runOnce();
        ((Consumer) Mockito.verify(this.mainConsumer)).poll(ofMillis);
    }

    @Test
    public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
        Assume.assumeFalse(this.stateUpdaterEnabled);
        this.thread = setUpThread(configProps(false));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        runOnce();
        ((Consumer) Mockito.verify(this.mainConsumer)).poll(Duration.ZERO);
    }

    @Test
    public void shouldGetMainAndRestoreConsumerInstanceId() throws Exception {
        getMainAndRestoreConsumerInstanceId(false);
    }

    @Test
    public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout() throws Exception {
        getMainAndRestoreConsumerInstanceId(true);
    }

    private void getMainAndRestoreConsumerInstanceId(boolean z) throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        this.clientSupplier.consumer.setClientInstanceId(randomUuid);
        if (z) {
            this.clientSupplier.consumer.injectTimeoutException(1);
        }
        Uuid randomUuid2 = Uuid.randomUuid();
        this.clientSupplier.restoreConsumer.setClientInstanceId(randomUuid2);
        if (z) {
            this.clientSupplier.restoreConsumer.injectTimeoutException(1);
        }
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        this.thread.maybeGetClientInstanceIds();
        MatcherAssert.assertThat((Uuid) ((KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-consumer")).get(), CoreMatchers.equalTo(randomUuid));
        MatcherAssert.assertThat((Uuid) ((KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-restore-consumer")).get(), CoreMatchers.equalTo(randomUuid2));
    }

    @Test
    public void shouldGetProducerInstanceId() throws Exception {
        getProducerInstanceId(false, false);
    }

    @Test
    public void shouldGetProducerInstanceIdWithInternalTimeout() throws Exception {
        getProducerInstanceId(true, false);
    }

    @Test
    public void shouldNotGetProducerInstanceIdWithEosV1() throws Exception {
        getProducerInstanceId(false, true);
    }

    private void getProducerInstanceId(boolean z, boolean z2) throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        MockProducer<byte[], byte[]> mockProducer = new MockProducer<>();
        if (!z2) {
            mockProducer.setClientInstanceId(randomUuid);
            if (z) {
                mockProducer.injectTimeoutException(1);
            }
        }
        this.clientSupplier.prepareProducer(mockProducer);
        Properties configProps = configProps(z2);
        if (z2) {
            configProps.put("processing.guarantee", "exactly_once");
        }
        this.thread = createStreamThread("clientId", new StreamsConfig(configProps));
        this.thread.setState(StreamThread.State.STARTING);
        KafkaFuture producersClientInstanceIds = this.thread.producersClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        this.thread.maybeGetClientInstanceIds();
        if (z2) {
            MatcherAssert.assertThat(producersClientInstanceIds.get(), CoreMatchers.equalTo(Collections.emptyMap()));
        } else {
            MatcherAssert.assertThat((Uuid) ((KafkaFuture) ((Map) producersClientInstanceIds.get()).get("clientId-StreamThread-1-producer")).get(), CoreMatchers.equalTo(randomUuid));
        }
    }

    @Test
    public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized() {
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-consumer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("clientInstanceId not set"));
    }

    @Test
    public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized() {
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-restore-consumer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("clientInstanceId not set"));
    }

    @Test
    public void shouldReturnErrorIfProducerInstanceIdNotInitialized() throws Exception {
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map map = (Map) this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) map.get("clientId-StreamThread-1-producer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("clientInstanceId not set"));
    }

    @Test
    public void shouldReturnNullIfMainConsumerTelemetryDisabled() throws Exception {
        this.clientSupplier.consumer.disableTelemetry();
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        MatcherAssert.assertThat((Uuid) ((KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-consumer")).get(), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void shouldReturnNullIfRestoreConsumerTelemetryDisabled() throws Exception {
        this.clientSupplier.restoreConsumer.disableTelemetry();
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        MatcherAssert.assertThat((Uuid) ((KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-restore-consumer")).get(), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void shouldReturnNullIfProducerTelemetryDisabled() throws Exception {
        MockProducer<byte[], byte[]> mockProducer = new MockProducer<>();
        mockProducer.disableTelemetry();
        this.clientSupplier.prepareProducer(mockProducer);
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map map = (Map) this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.thread.maybeGetClientInstanceIds();
        MatcherAssert.assertThat((Uuid) ((KafkaFuture) map.get("clientId-StreamThread-1-producer")).get(), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void shouldTimeOutOnMainConsumerInstanceId() {
        this.clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid());
        this.clientSupplier.consumer.injectTimeoutException(-1);
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-consumer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("Could not retrieve main consumer client instance id."));
    }

    @Test
    public void shouldTimeOutOnRestoreConsumerInstanceId() {
        this.clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid());
        this.clientSupplier.restoreConsumer.injectTimeoutException(-1);
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerClientInstanceIds = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) consumerClientInstanceIds.get("clientId-StreamThread-1-restore-consumer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("Could not retrieve restore consumer client instance id."));
    }

    @Test
    public void shouldTimeOutOnProducerInstanceId() throws Exception {
        MockProducer<byte[], byte[]> mockProducer = new MockProducer<>();
        mockProducer.setClientInstanceId(Uuid.randomUuid());
        mockProducer.injectTimeoutException(-1);
        this.clientSupplier.prepareProducer(mockProducer);
        this.thread = createStreamThread("clientId");
        this.thread.setState(StreamThread.State.STARTING);
        Map map = (Map) this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture kafkaFuture = (KafkaFuture) map.get("clientId-StreamThread-1-producer");
        kafkaFuture.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, kafkaFuture::get);
        MatcherAssert.assertThat(executionException.getCause(), IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat(executionException.getCause().getMessage(), CoreMatchers.equalTo("Could not retrieve thread producer client instance id."));
    }

    private StreamThread setUpThread(Properties properties) {
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Mockito.when(this.mainConsumer.poll((Duration) Mockito.any(Duration.class))).thenReturn(new ConsumerRecords(Collections.emptyMap()));
        Mockito.when(this.mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, streamsConfig);
        topologyMetadata.buildAndRewriteTopology();
        return new StreamThread(this.mockTime, new StreamsConfig((Map) properties.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))), (Admin) null, this.mainConsumer, (Consumer) null, this.changelogReader, "", taskManager, (StateUpdater) null, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime), topologyMetadata, "thread-id", new LogContext(), (AtomicInteger) null, (AtomicLong) null, new LinkedList(), (Runnable) null, (BiConsumer) null, (java.util.function.Consumer) null);
    }

    private TaskManager mockTaskManager(Task task) {
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(task.state()).thenReturn(Task.State.RUNNING);
        Mockito.when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, task));
        return taskManager;
    }

    private TaskManager mockTaskManagerPurge() {
        return mockTaskManager((Task) Mockito.mock(Task.class));
    }

    private TaskManager mockTaskManagerCommit(Task task, int i) {
        TaskManager mockTaskManager = mockTaskManager(task);
        Mockito.when(Integer.valueOf(mockTaskManager.commit(Utils.mkSet(new Task[]{task})))).thenReturn(Integer.valueOf(i));
        return mockTaskManager;
    }

    private void setupInternalTopologyWithoutState(StreamsConfig streamsConfig) {
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockTime, true, false);
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TP_1_NAME});
        this.internalTopologyBuilder.addProcessor("processor1", MockApiProcessor::new, new String[]{"source1"});
        this.internalTopologyBuilder.setStreamsConfig(streamsConfig);
    }

    private Collection<Task> createStandbyTask(StreamsConfig streamsConfig) {
        Logger logger = new LogContext("test").logger(StreamThreadTest.class);
        return new StandbyTaskCreator(new TopologyMetadata(this.internalTopologyBuilder, streamsConfig), streamsConfig, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime), this.stateDirectory, new MockChangelogReader(), CLIENT_ID, logger, false).createTasks(Collections.singletonMap(new TaskId(1, 2), Collections.emptySet()));
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long j) {
        addRecord(mockConsumer, j, -1L);
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long j, long j2) {
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j, j2, TimestampType.CREATE_TIME, -1, -1, new byte[0], new byte[0], new RecordHeaders(), Optional.empty()));
    }

    StreamTask activeTask(TaskManager taskManager, TopicPartition topicPartition) {
        Stream filter = taskManager.allTasks().values().stream().filter((v0) -> {
            return v0.isActive();
        });
        filter.getClass();
        Iterable<StreamTask> iterable = filter::iterator;
        for (StreamTask streamTask : iterable) {
            if (streamTask.inputPartitions().contains(topicPartition)) {
                return streamTask;
            }
        }
        return null;
    }

    StandbyTask standbyTask(TaskManager taskManager, TopicPartition topicPartition) {
        Stream stream = taskManager.standbyTaskMap().values().stream();
        stream.getClass();
        Iterable<StandbyTask> iterable = stream::iterator;
        for (StandbyTask standbyTask : iterable) {
            if (standbyTask.inputPartitions().contains(topicPartition)) {
                return standbyTask;
            }
        }
        return null;
    }

    private StreamThread buildStreamThread(Consumer<byte[], byte[]> consumer, TaskManager taskManager, StreamsConfig streamsConfig, TopologyMetadata topologyMetadata) {
        return new StreamThread(this.mockTime, streamsConfig, (Admin) null, consumer, consumer, this.changelogReader, (String) null, taskManager, (StateUpdater) null, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", this.mockTime), topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), (Runnable) null, HANDLER, (java.util.function.Consumer) null);
    }

    private void runOnce() {
        if (this.processingThreadsEnabled) {
            this.thread.runOnceWithProcessingThreads();
        } else {
            this.thread.runOnceWithoutProcessingThreads();
        }
    }

    private void runUntilTimeoutOrException(Runnable runnable) {
        long currentTimeMillis = System.currentTimeMillis() + 15000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            runnable.run();
            this.mockTime.sleep(10L);
        }
    }

    private boolean runUntilTimeoutOrCondition(Runnable runnable, TestCondition testCondition) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 15000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            runnable.run();
            if (testCondition.conditionMet()) {
                return true;
            }
            this.mockTime.sleep(10L);
        }
        return false;
    }
}
