package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockSourceNode;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.class */
public class AssignedStreamsTasksTest {
    private AssignedStreamsTasks assignedTasks;
    private final StreamTask t1 = (StreamTask) EasyMock.createMock(StreamTask.class);
    private final StreamTask t2 = (StreamTask) EasyMock.createMock(StreamTask.class);
    private final TopicPartition tp1 = new TopicPartition("t1", 0);
    private final TopicPartition tp2 = new TopicPartition("t2", 0);
    private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
    private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
    private final TaskId taskId1 = new TaskId(0, 0);
    private final TaskId taskId2 = new TaskId(1, 0);
    private final List<TopicPartition> revokedChangelogs = new ArrayList();

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest$TaskTestSuite.class */
    abstract class TaskTestSuite {
        TaskId clearingTaskId = new TaskId(0, 0);
        List<TopicPartition> clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0));

        TaskTestSuite() {
        }

        abstract void additionalSetup(StreamTask streamTask);

        abstract void action(StreamTask streamTask);

        abstract Set<TaskId> taskIds();

        void createTaskAndClear() {
            StreamTask streamTask = (StreamTask) EasyMock.createMock(StreamTask.class);
            EasyMock.expect(streamTask.id()).andReturn(this.clearingTaskId).anyTimes();
            EasyMock.expect(streamTask.partitions()).andReturn(Collections.emptySet()).anyTimes();
            EasyMock.expect(streamTask.changelogPartitions()).andReturn(this.clearingPartitions).anyTimes();
            EasyMock.expect(streamTask.toString(EasyMock.anyString())).andReturn("task").anyTimes();
            additionalSetup(streamTask);
            EasyMock.replay(new Object[]{streamTask});
            action(streamTask);
            Assert.assertEquals(new HashSet(Collections.singleton(streamTask.id())), taskIds());
            AssignedStreamsTasksTest.this.assignedTasks.closeAllTasksAsZombies();
            Assert.assertEquals(Collections.emptySet(), taskIds());
        }
    }

    @Before
    public void before() {
        this.assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
        EasyMock.expect(this.t1.id()).andReturn(this.taskId1).anyTimes();
        EasyMock.expect(this.t2.id()).andReturn(this.taskId2).anyTimes();
        this.revokedChangelogs.clear();
    }

    @Test
    public void shouldInitializeNewTasks() {
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(false);
        this.t1.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.t2.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t2.initializeStateStores())).andReturn(true);
        this.t2.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.expect(this.t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        this.assignedTasks.initializeNewTasks();
        Collection restoringTasks = this.assignedTasks.restoringTasks();
        MatcherAssert.assertThat(Integer.valueOf(restoringTasks.size()), IsEqual.equalTo(1));
        Assert.assertSame(restoringTasks.iterator().next(), this.t1);
    }

    @Test
    public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
        this.t2.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t2.initializeStateStores())).andReturn(true);
        this.t2.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.expect(this.t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.t2});
        this.assignedTasks.addNewTask(this.t2);
        this.assignedTasks.initializeNewTasks();
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId2)));
    }

    @Test
    public void shouldTransitionFullyRestoredTasksToRunning() {
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.tp1});
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(mkSet).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Utils.mkSet(new TopicPartition[]{this.changeLog1, this.changeLog2})).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(true).anyTimes();
        this.t1.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        this.assignedTasks.updateRestored(Utils.mkSet(new TopicPartition[]{this.changeLog1}));
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.emptySet()));
        this.assignedTasks.updateRestored(Utils.mkSet(new TopicPartition[]{this.changeLog2}));
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
    }

    @Test
    public void shouldSuspendRunningTasks() {
        mockRunningTaskSuspension();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        MatcherAssert.assertThat(this.assignedTasks.suspendedTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseRestoringTasks() {
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        this.t1.closeStateManager(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(this.assignedTasks.closeRestoringTasks(this.assignedTasks.restoringTaskIds(), this.revokedChangelogs), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseUnInitializedTasksOnSuspend() {
        EasyMock.expect(this.t1.partitions()).andAnswer(Collections::emptySet).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andAnswer(Collections::emptyList).anyTimes();
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        MatcherAssert.assertThat(this.assignedTasks.suspendOrCloseTasks(this.assignedTasks.allAssignedTaskIds(), this.revokedChangelogs), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotSuspendSuspendedTasks() {
        mockRunningTaskSuspension();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        MatcherAssert.assertThat(this.assignedTasks.suspendOrCloseTasks(this.assignedTasks.allAssignedTaskIds(), this.revokedChangelogs), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendWhenRuntimeException() {
        mockTaskInitialization();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.not(CoreMatchers.nullValue()));
        Assert.assertTrue(this.assignedTasks.runningTaskIds().isEmpty());
        Assert.assertTrue(this.assignedTasks.suspendedTaskIds().isEmpty());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        this.t1.close(false, true);
        EasyMock.expectLastCall().andThrow(new RuntimeException("any exception"));
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        Assert.assertTrue(this.assignedTasks.runningTaskIds().isEmpty());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseUncleanAndThenRethrowOnShutdownIfRuntimeException() {
        mockTaskInitialization();
        this.t1.close(true, false);
        EasyMock.expectLastCall().andThrow(new RuntimeException("any first exception"));
        this.t1.close(false, false);
        EasyMock.expectLastCall().andThrow(new RuntimeException("any second exception"));
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.shutdown(true);
            Assert.fail("expected a runtime exception");
        } catch (RuntimeException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("any first exception"));
        }
    }

    @Test
    public void shouldCloseWithoutExceptionOnShutdownIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.close(true, false);
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        this.t1.close(false, true);
        EasyMock.expectLastCall().andThrow(new RuntimeException("any second exception"));
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        this.assignedTasks.shutdown(true);
    }

    @Test
    public void shouldResumeMatchingSuspendedTasks() {
        mockRunningTaskSuspension();
        this.t1.resume();
        EasyMock.expectLastCall();
        this.t1.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        Assert.assertTrue(this.assignedTasks.maybeResumeSuspendedTask(this.taskId1, Collections.singleton(this.tp1)));
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotCloseTaskWithinResumeSuspendedIfTaskMigratedException() {
        mockRunningTaskSuspension();
        this.t1.resume();
        this.t1.initializeTopology();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        verifyTaskMigratedExceptionDoesNotCloseTask(() -> {
            this.assignedTasks.maybeResumeSuspendedTask(this.taskId1, Collections.singleton(this.tp1));
        });
    }

    private void mockTaskInitialization() {
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(true);
        this.t1.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
    }

    @Test
    public void shouldCommitRunningTasks() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        this.assignedTasks.commit();
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotCloseTaskWithinCommitIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        verifyTaskMigratedExceptionDoesNotCloseTask(() -> {
            this.assignedTasks.commit();
        });
    }

    @Test
    public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.commit();
            Assert.fail("Should have thrown exception");
        } catch (Exception e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCommitRunningTasksIfNeeded() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitRequested())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.maybeCommitPerUserRequested()), IsEqual.equalTo(1));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotCloseTaskWithinMaybeCommitIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitRequested())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        verifyTaskMigratedExceptionDoesNotCloseTask(() -> {
            this.assignedTasks.maybeCommitPerUserRequested();
        });
    }

    @Test
    public void shouldNotCloseTaskWithinProcessIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.isProcessable(0L))).andReturn(true);
        this.t1.process();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        verifyTaskMigratedExceptionDoesNotCloseTask(() -> {
            this.assignedTasks.process(0L);
        });
    }

    @Test
    public void shouldNotProcessUnprocessableTasks() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.isProcessable(0L))).andReturn(false);
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.process(0L)), IsEqual.equalTo(0));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldAlwaysProcessProcessableTasks() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.isProcessable(0L))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.process())).andReturn(true).once();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.process(0L)), IsEqual.equalTo(1));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldPunctuateRunningTasks() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateSystemTime())).andReturn(true);
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.punctuate()), IsEqual.equalTo(2));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotCloseTaskWithinMaybePunctuateStreamTimeIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.maybePunctuateStreamTime();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        verifyTaskMigratedExceptionDoesNotCloseTask(() -> {
            this.assignedTasks.punctuate();
        });
    }

    @Test
    public void shouldNotloseTaskWithinMaybePunctuateSystemTimeIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        this.t1.maybePunctuateSystemTime();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.punctuate();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
            MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        }
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldReturnNumberOfPunctuations() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateSystemTime())).andReturn(false);
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.punctuate()), IsEqual.equalTo(1));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseCleanlyWithSuspendedTaskAndEOS() {
        Deserializer deserializer = Serdes.ByteArray().deserializer();
        Serializer serializer = Serdes.ByteArray().serializer();
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        MockProducer mockProducer = new MockProducer(false, serializer, serializer);
        MockSourceNode mockSourceNode = new MockSourceNode(new String[]{"topic"}, deserializer, deserializer);
        MockChangelogReader mockChangelogReader = new MockChangelogReader();
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.singletonList(mockSourceNode), Collections.singletonMap("topic", mockSourceNode), Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
        Set singleton = Collections.singleton(new TopicPartition("topic", 1));
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)));
        MockTime mockTime = new MockTime();
        this.assignedTasks.addNewTask(new StreamTask(new TaskId(0, 0), singleton, processorTopology, mockConsumer, mockChangelogReader, StreamTaskTest.createConfig(true), mockStreamsMetrics, new StateDirectory(StreamTaskTest.createConfig(true), mockTime, true), (ThreadCache) null, mockTime, () -> {
            return mockProducer;
        }));
        this.assignedTasks.initializeNewTasks();
        Assert.assertNull(this.assignedTasks.suspendOrCloseTasks(this.assignedTasks.allAssignedTaskIds(), this.revokedChangelogs));
        this.assignedTasks.shutdown(true);
    }

    @Test
    public void shouldClearZombieCreatedTasks() {
        new TaskTestSuite() { // from class: org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.1
            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void additionalSetup(StreamTask streamTask) {
                streamTask.close(false, true);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void action(StreamTask streamTask) {
                AssignedStreamsTasksTest.this.assignedTasks.addNewTask(streamTask);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public Set<TaskId> taskIds() {
                return AssignedStreamsTasksTest.this.assignedTasks.created.keySet();
            }
        }.createTaskAndClear();
    }

    @Test
    public void shouldClearZombieRestoringTasks() {
        new TaskTestSuite() { // from class: org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.2
            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void additionalSetup(StreamTask streamTask) {
                streamTask.closeStateManager(false);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void action(StreamTask streamTask) {
                AssignedStreamsTasksTest.this.assignedTasks.addTaskToRestoring(streamTask);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public Set<TaskId> taskIds() {
                return AssignedStreamsTasksTest.this.assignedTasks.restoringTaskIds();
            }
        }.createTaskAndClear();
    }

    @Test
    public void shouldClearZombieRunningTasks() {
        new TaskTestSuite() { // from class: org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.3
            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void additionalSetup(StreamTask streamTask) {
                streamTask.initializeTopology();
                streamTask.close(false, true);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void action(StreamTask streamTask) {
                AssignedStreamsTasksTest.this.assignedTasks.transitionToRunning(streamTask);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public Set<TaskId> taskIds() {
                return AssignedStreamsTasksTest.this.assignedTasks.runningTaskIds();
            }
        }.createTaskAndClear();
    }

    @Test
    public void shouldClearZombieSuspendedTasks() {
        new TaskTestSuite() { // from class: org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.4
            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void additionalSetup(StreamTask streamTask) {
                streamTask.initializeTopology();
                streamTask.suspend();
                streamTask.closeSuspended(false, (RuntimeException) null);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public void action(StreamTask streamTask) {
                AssignedStreamsTasksTest.this.assignedTasks.transitionToRunning(streamTask);
                ArrayList arrayList = new ArrayList();
                AssignedStreamsTasksTest.this.assignedTasks.suspendOrCloseTasks(new HashSet(Collections.singletonList(streamTask.id())), arrayList);
                Assert.assertEquals(this.clearingPartitions, arrayList);
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.TaskTestSuite
            public Set<TaskId> taskIds() {
                return AssignedStreamsTasksTest.this.assignedTasks.suspendedTaskIds();
            }
        }.createTaskAndClear();
    }

    private void addAndInitTask() {
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
    }

    private RuntimeException suspendTask() {
        addAndInitTask();
        return this.assignedTasks.suspendOrCloseTasks(this.assignedTasks.allAssignedTaskIds(), this.revokedChangelogs);
    }

    private void mockRunningTaskSuspension() {
        this.t1.initializeMetadata();
        EasyMock.expect(Boolean.valueOf(this.t1.initializeStateStores())).andReturn(true);
        this.t1.initializeTopology();
        EasyMock.expectLastCall().once();
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(false).anyTimes();
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
        this.t1.suspend();
        EasyMock.expectLastCall();
    }

    private void verifyTaskMigratedExceptionDoesNotCloseTask(ThrowingRunnable throwingRunnable) {
        Set singleton = Collections.singleton(this.taskId1);
        Assert.assertThrows(TaskMigratedException.class, throwingRunnable);
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(singleton));
        EasyMock.verify(new Object[]{this.t1});
    }
}
