package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest.class */
public class TaskManagerTest {

    @Mock(type = MockType.STRICT)
    private InternalTopologyBuilder topologyBuilder;

    @Mock(type = MockType.DEFAULT)
    private StateDirectory stateDirectory;

    @Mock(type = MockType.NICE)
    private ChangelogReader changeLogReader;

    @Mock(type = MockType.STRICT)
    private Consumer<byte[], byte[]> consumer;

    @Mock(type = MockType.STRICT)
    private ActiveTaskCreator activeTaskCreator;

    @Mock(type = MockType.NICE)
    private StandbyTaskCreator standbyTaskCreator;

    @Mock(type = MockType.NICE)
    private Admin adminClient;
    private TaskManager taskManager;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TaskId taskId00 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final Set<TopicPartition> taskId00Partitions = Utils.mkSet(new TopicPartition[]{this.t1p0});
    private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
    private final TaskId taskId01 = new TaskId(0, 1);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final Set<TopicPartition> taskId01Partitions = Utils.mkSet(new TopicPartition[]{this.t1p1});
    private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = Collections.singletonMap(this.taskId01, this.taskId01Partitions);
    private final TaskId taskId02 = new TaskId(0, 2);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final Set<TopicPartition> taskId02Partitions = Utils.mkSet(new TopicPartition[]{this.t1p2});
    private final TaskId taskId03 = new TaskId(0, 3);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final Set<TopicPartition> taskId03Partitions = Utils.mkSet(new TopicPartition[]{this.t1p3});
    private final TaskId taskId04 = new TaskId(0, 4);
    private final TopicPartition t1p4 = new TopicPartition("topic1", 4);
    private final Set<TopicPartition> taskId04Partitions = Utils.mkSet(new TopicPartition[]{this.t1p4});
    private final TaskId taskId05 = new TaskId(0, 5);
    private final TopicPartition t1p5 = new TopicPartition("topic1", 5);
    private final Set<TopicPartition> taskId05Partitions = Utils.mkSet(new TopicPartition[]{this.t1p5});
    private final TaskId taskId10 = new TaskId(1, 0);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final Set<TopicPartition> taskId10Partitions = Utils.mkSet(new TopicPartition[]{this.t2p0});
    private final Time time = new MockTime();

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest$StateMachineTask.class */
    public static class StateMachineTask extends AbstractTask implements Task {
        private final boolean active;
        private boolean commitNeeded;
        private boolean commitRequested;
        private boolean commitPrepared;
        private Map<TopicPartition, OffsetAndMetadata> committableOffsets;
        private Map<TopicPartition, Long> purgeableOffsets;
        private Map<TopicPartition, Long> changelogOffsets;
        private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue;

        StateMachineTask(TaskId taskId, Set<TopicPartition> set, boolean z) {
            this(taskId, set, z, null);
        }

        StateMachineTask(TaskId taskId, Set<TopicPartition> set, boolean z, ProcessorStateManager processorStateManager) {
            super(taskId, (ProcessorTopology) null, (StateDirectory) null, processorStateManager, set);
            this.commitNeeded = false;
            this.commitRequested = false;
            this.commitPrepared = false;
            this.committableOffsets = Collections.emptyMap();
            this.changelogOffsets = Collections.emptyMap();
            this.queue = new HashMap();
            this.active = z;
        }

        public void initializeIfNeeded() {
            if (state() == Task.State.CREATED) {
                transitionTo(Task.State.RESTORING);
                if (this.active) {
                    return;
                }
                transitionTo(Task.State.RUNNING);
            }
        }

        public void completeRestoration() {
            if (state() == Task.State.RUNNING) {
                return;
            }
            transitionTo(Task.State.RUNNING);
        }

        public void setCommitNeeded() {
            this.commitNeeded = true;
        }

        public boolean commitNeeded() {
            return this.commitNeeded;
        }

        public void setCommitRequested() {
            this.commitRequested = true;
        }

        public boolean commitRequested() {
            return this.commitRequested;
        }

        public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
            this.commitPrepared = true;
            return this.committableOffsets;
        }

        public void postCommit() {
            this.commitNeeded = false;
        }

        public void suspend() {
            if (state() == Task.State.CLOSED) {
                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + this.id);
            }
            if (state() == Task.State.SUSPENDED) {
                return;
            }
            transitionTo(Task.State.SUSPENDED);
        }

        public void resume() {
            if (state() == Task.State.SUSPENDED) {
                transitionTo(Task.State.RUNNING);
            }
        }

        public void closeClean() {
            transitionTo(Task.State.CLOSED);
        }

        public void closeDirty() {
            transitionTo(Task.State.CLOSED);
        }

        public void closeCleanAndRecycleState() {
            transitionTo(Task.State.CLOSED);
        }

        public void update(Set<TopicPartition> set, Map<String, List<String>> map) {
            this.inputPartitions = set;
        }

        void setCommittableOffsetsAndMetadata(Map<TopicPartition, OffsetAndMetadata> map) {
            if (!this.active) {
                throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
            }
            this.committableOffsets = map;
        }

        public StateStore getStore(String str) {
            return null;
        }

        public Collection<TopicPartition> changelogPartitions() {
            return this.changelogOffsets.keySet();
        }

        public boolean isActive() {
            return this.active;
        }

        void setPurgeableOffsets(Map<TopicPartition, Long> map) {
            this.purgeableOffsets = map;
        }

        public Map<TopicPartition, Long> purgeableOffsets() {
            return this.purgeableOffsets;
        }

        void setChangelogOffsets(Map<TopicPartition, Long> map) {
            this.changelogOffsets = map;
        }

        public Map<TopicPartition, Long> changelogOffsets() {
            return this.changelogOffsets;
        }

        public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
            if (!isActive()) {
                throw new IllegalStateException("Can't add records to an inactive task.");
            }
            LinkedList<ConsumerRecord<byte[], byte[]>> computeIfAbsent = this.queue.computeIfAbsent(topicPartition, topicPartition2 -> {
                return new LinkedList();
            });
            Iterator<ConsumerRecord<byte[], byte[]>> it = iterable.iterator();
            while (it.hasNext()) {
                computeIfAbsent.add(it.next());
            }
        }

        public boolean process(long j) {
            if (!isActive() || state() != Task.State.RUNNING) {
                throw new IllegalStateException("Can't process an inactive or non-running task.");
            }
            Iterator<LinkedList<ConsumerRecord<byte[], byte[]>>> it = this.queue.values().iterator();
            while (it.hasNext()) {
                if (it.next().poll() != null) {
                    return true;
                }
            }
            return false;
        }
    }

    @Before
    public void setUp() {
        setUpTaskManager(StreamThread.ProcessingMode.AT_LEAST_ONCE);
    }

    private void setUpTaskManager(StreamThread.ProcessingMode processingMode) {
        this.taskManager = new TaskManager(this.changeLogReader, UUID.randomUUID(), "taskManagerTest", this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.adminClient, this.stateDirectory, processingMode);
        this.taskManager.setMainConsumer(this.consumer);
    }

    @Test
    public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
        TopicPartition topicPartition = new TopicPartition("topic2", 1);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId01, Utils.mkSet(new TopicPartition[]{this.t1p1, topicPartition}))});
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Collections.emptyList()).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.eq(Arrays.asList(this.t1p1, topicPartition)), EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.topologyBuilder});
    }

    @Test
    public void shouldNotLockAnythingIfStateDirIsEmpty() {
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new File[0]).once();
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        EasyMock.verify(new Object[]{this.stateDirectory});
        Assert.assertTrue(this.taskManager.lockedTaskDirectories().isEmpty());
    }

    @Test
    public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
        expectLockObtainedFor(this.taskId01);
        expectLockFailedFor(this.taskId10);
        makeTaskFolders(this.taskId01.toString(), this.taskId10.toString(), "dummy");
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        EasyMock.verify(new Object[]{this.stateDirectory});
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Collections.singleton(this.taskId01)));
    }

    @Test
    public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
        expectLockObtainedFor(this.taskId00, this.taskId01, this.taskId02);
        expectUnlockFor(this.taskId02);
        makeTaskFolders(this.taskId00.toString(), this.taskId01.toString(), this.taskId02.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01, this.taskId02})));
        handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        EasyMock.reset(new Object[]{this.consumer});
        expectConsumerAssignmentPaused(this.consumer);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), -2L), Utils.mkEntry(new TopicPartition("changelog", 1), -2L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, -2L)}));
    }

    @Test
    public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)}));
    }

    @Test
    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), -4L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 10L)}));
    }

    private void computeOffsetSumAndVerify(Map<TopicPartition, Long> map, Map<TaskId, Long> map2) throws Exception {
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        handleAssignment(Collections.emptyMap(), Collections.emptyMap(), this.taskId00Assignment).get(this.taskId00).setChangelogOffsets(map);
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(map2));
    }

    @Test
    public void shouldComputeOffsetSumForStandbyTask() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        handleAssignment(Collections.emptyMap(), this.taskId00Assignment, Collections.emptyMap()).get(this.taskId00).setChangelogOffsets(mkMap);
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
        expectLockFailedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue(this.taskManager.lockedTaskDirectories().isEmpty());
        Assert.assertTrue(this.taskManager.getTaskOffsetSums().isEmpty());
    }

    @Test
    public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.expect(this.stateDirectory.checkpointFileFor(this.taskId00)).andReturn(getCheckpointFile(this.taskId00));
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue(this.taskManager.getTaskOffsetSums().isEmpty());
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 1), 4611686018427387903L), Utils.mkEntry(new TopicPartition("changelog", 2), 4611686018427387903L), Utils.mkEntry(new TopicPartition("changelog", 3), 4611686018427387903L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, Long.MAX_VALUE)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.anyObject())).andReturn(Collections.emptyList()).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.1
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeClean() {
                throw new RuntimeException("KABOOM!");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.anyObject())).andReturn(Collections.emptyList()).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows."));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask2)).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        makeTaskFolders(this.taskId00.toString(), this.taskId01.toString());
        expectLockObtainedFor(this.taskId00, this.taskId01);
        makeTaskFolders(new String[0]);
        expectLockObtainedFor(new TaskId[0]);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleLostAll();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.is(Collections.singletonMap(this.taskId01, stateMachineTask2)));
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldReInitializeThreadProducerOnHandleLostAllIfEosBetaEnabled() {
        this.activeTaskCreator.reInitializeThreadProducer();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator});
        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_BETA);
        this.taskManager.handleLostAll();
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.anyObject())).andReturn(Collections.emptyList()).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows."));
        MatcherAssert.assertThat(runtimeException.getCause(), Matchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldReviveCorruptTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.createStrictMock(ProcessorStateManager.class);
        processorStateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay(new Object[]{processorStateManager});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.pause(this.taskId00Partitions);
        EasyMock.expectLastCall();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        EasyMock.expect(this.consumer.committed(this.taskId00Partitions)).andReturn(Collections.singletonMap(this.t1p0, offsetAndMetadata));
        this.consumer.seek(this.t1p0, offsetAndMetadata);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.setPartitionResetter(set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        });
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleCorruption(Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{processorStateManager});
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.createStrictMock(ProcessorStateManager.class);
        processorStateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay(new Object[]{processorStateManager});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.2
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.pause(this.taskId00Partitions);
        EasyMock.expectLastCall();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        EasyMock.expect(this.consumer.committed(this.taskId00Partitions)).andReturn(Collections.singletonMap(this.t1p0, offsetAndMetadata));
        this.consumer.seek(this.t1p0, offsetAndMetadata);
        EasyMock.expectLastCall();
        this.taskManager.setPartitionResetter(set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        });
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleCorruption(Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{processorStateManager});
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.createStrictMock(ProcessorStateManager.class);
        processorStateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay(new Object[]{processorStateManager});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager);
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(hashMap))).andStubReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        this.consumer.commitSync((Map) EasyMock.eq(Collections.emptyMap()));
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.pause(this.taskId00Partitions);
        EasyMock.expectLastCall();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        EasyMock.expect(this.consumer.committed(this.taskId00Partitions)).andReturn(Collections.singletonMap(this.t1p0, offsetAndMetadata));
        this.consumer.seek(this.t1p0, offsetAndMetadata);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.setPartitionResetter(set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        });
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        this.taskManager.handleCorruption(Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        Assert.assertTrue(stateMachineTask2.commitPrepared);
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldNotCommitNonRunningNonCorruptedTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.createStrictMock(ProcessorStateManager.class);
        processorStateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay(new Object[]{processorStateManager});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager);
        stateMachineTask2.setCommitNeeded();
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(hashMap))).andStubReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.pause(this.taskId00Partitions);
        EasyMock.expectLastCall();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        EasyMock.expect(this.consumer.committed(this.taskId00Partitions)).andReturn(Collections.singletonMap(this.t1p0, offsetAndMetadata));
        this.consumer.seek(this.t1p0, offsetAndMetadata);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.setPartitionResetter(set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        });
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.handleCorruption(Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        EasyMock.verify(new Object[]{this.activeTaskCreator});
        Assert.assertFalse(stateMachineTask2.commitPrepared);
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.createStrictMock(ProcessorStateManager.class);
        processorStateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay(new Object[]{processorStateManager});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.3
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new TaskMigratedException("You dropped out of the group!", new RuntimeException());
            }
        };
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singleton(stateMachineTask));
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singleton(stateMachineTask2));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List) EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId01Assignment, this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.handleCorruption(Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldAddNonResumedSuspendedTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldUpdateInputPartitionsAfterRebalance() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader, false);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p1});
        this.taskManager.handleAssignment(Collections.singletonMap(this.taskId00, mkSet), Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertEquals(mkSet, stateMachineTask.inputPartitions());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
    }

    @Test
    public void shouldAddNewActiveTasks() {
        Map<TaskId, Set<TopicPartition>> map = this.taskId00Assignment;
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(map))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(map, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.4
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void initializeIfNeeded() {
                throw new LockException("can't lock");
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.5
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void initializeIfNeeded() {
                throw new TimeoutException("timed out");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.6
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void completeRestoration() {
                throw new TimeoutException("timeout!");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldSuspendActiveTasksDuringRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
    }

    @Test
    public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosBeta() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_BETA);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        EasyMock.expect(this.activeTaskCreator.threadProducer()).andReturn(streamsProducer);
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(mkMap2))).andReturn(Collections.singletonList(stateMachineTask4));
        ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect(this.consumer.groupMetadata()).andReturn(consumerGroupMetadata);
        streamsProducer.commitTransaction(hashMap, consumerGroupMetadata);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldCommitOnlyRevokedActiveTasksThatNeedCommittingOnHandleRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask2.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null)));
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(mkMap2))).andReturn(Collections.singletonList(stateMachineTask4));
        this.consumer.commitSync(hashMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map singletonMap2 = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(singletonMap))).andReturn(Collections.singleton(stateMachineTask));
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(singletonMap2))).andReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map singletonMap2 = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(singletonMap))).andReturn(Collections.singleton(stateMachineTask));
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(singletonMap2))).andReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(singletonMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(true));
    }

    @Test
    public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
    }

    @Test
    public void shouldPassUpIfExceptionDuringSuspend() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.7
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(this.taskId00Partitions);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions), Utils.mkEntry(this.taskId03, this.taskId03Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.8
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(topicPartition);
            }
        };
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.9
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean.set(true);
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.10
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean2.set(true);
            }
        };
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.11
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean3.set(true);
            }
        };
        EasyMock.resetToStrict(new Object[]{this.changeLogReader});
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3, stateMachineTask4)).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId01));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId02));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId03));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2), Utils.mkEntry(this.taskId02, stateMachineTask3), Utils.mkEntry(this.taskId03, stateMachineTask4)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        });
        MatcherAssert.assertThat(runtimeException.getMessage(), IsEqual.equalTo("Unexpected exception while closing task"));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean3.get()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Unexpected exception while closing task"));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("oops"));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.12
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(topicPartition);
            }
        };
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        EasyMock.resetToStrict(new Object[]{this.changeLogReader});
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever"));
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Unexpected exception while closing task"));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("whatever"));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.13
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(topicPartition);
            }
        };
        EasyMock.resetToStrict(new Object[]{this.changeLogReader});
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever"));
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Unexpected exception while closing task"));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("whatever"));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.14
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("task 0_1 prepare commit boom!");
            }
        };
        stateMachineTask2.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId00, stateMachineTask);
        this.taskManager.tasks().put(this.taskId01, stateMachineTask2);
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        })).getCause().getMessage(), Matchers.is("task 0_1 prepare commit boom!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.tasks(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
    }

    @Test
    public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.15
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("task 0_1 suspend boom!");
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        this.taskManager.tasks().put(this.taskId00, stateMachineTask);
        this.taskManager.tasks().put(this.taskId01, stateMachineTask2);
        this.taskManager.tasks().put(this.taskId02, stateMachineTask3);
        EasyMock.replay(new Object[]{this.activeTaskCreator});
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(Utils.union(HashSet::new, new Set[]{this.taskId01Partitions, this.taskId02Partitions}));
        })).getCause().getMessage(), Matchers.is("task 0_1 suspend boom!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.SUSPENDED));
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.16
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(topicPartition);
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.17
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.18
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        EasyMock.resetToStrict(new Object[]{this.changeLogReader});
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3)).anyTimes();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever 0"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId01));
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever 1"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId02));
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever 2"));
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall().andThrow(new RuntimeException("whatever all"));
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(Collections.emptyMap()))).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2), Utils.mkEntry(this.taskId02, stateMachineTask3)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        this.taskManager.shutdown(false);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(singletonMap))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), singletonMap);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        this.taskManager.shutdown(true);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId01, stateMachineTask)));
    }

    @Test
    public void shouldHandleRebalanceEvents() {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(this.consumer.assignment()).andReturn(singleton);
        this.consumer.pause(singleton);
        EasyMock.expectLastCall();
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new File[0]);
        EasyMock.replay(new Object[]{this.consumer, this.stateDirectory});
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.isRebalanceInProgress()), Matchers.is(false));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.isRebalanceInProgress()), Matchers.is(true));
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.isRebalanceInProgress()), Matchers.is(false));
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask2)).anyTimes();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldCommitProvidedTasksIfNeeded() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, false);
        StateMachineTask stateMachineTask5 = new StateMachineTask(this.taskId04, this.taskId04Partitions, false);
        StateMachineTask stateMachineTask6 = new StateMachineTask(this.taskId05, this.taskId05Partitions, false);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, this.taskId03Partitions), Utils.mkEntry(this.taskId04, this.taskId04Partitions), Utils.mkEntry(this.taskId05, this.taskId05Partitions)});
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(mkMap2))).andReturn(Arrays.asList(stateMachineTask4, stateMachineTask5, stateMachineTask6)).anyTimes();
        this.consumer.commitSync((Map) EasyMock.eq(Collections.emptyMap()));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        stateMachineTask4.setCommitNeeded();
        stateMachineTask5.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commit(Arrays.asList(stateMachineTask, stateMachineTask3, stateMachineTask4, stateMachineTask6))), IsEqual.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask5.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask6.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(1));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        makeTaskFolders(this.taskId00.toString(), stateMachineTask2.toString());
        expectLockObtainedFor(this.taskId00, this.taskId01);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask2)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.stateDirectory, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(-1));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasksPerUserRequested()), IsEqual.equalTo(-1));
    }

    @Test
    public void shouldCommitViaConsumerIfEosDisabled() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.commitAll();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCommitViaProducerIfEosAlphaEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(this.activeTaskCreator.streamsProducerForTask(this.taskId01)).andReturn(streamsProducer);
        EasyMock.expect(this.activeTaskCreator.streamsProducerForTask(this.taskId02)).andReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, (String) null));
        streamsProducer.commitTransaction(singletonMap, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        streamsProducer.commitTransaction(singletonMap2, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        shouldCommitViaProducerIfEosEnabled(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA, streamsProducer, singletonMap, singletonMap2);
    }

    @Test
    public void shouldCommitViaProducerIfEosBetaEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(this.activeTaskCreator.threadProducer()).andReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, (String) null));
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        streamsProducer.commitTransaction(hashMap, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        shouldCommitViaProducerIfEosEnabled(StreamThread.ProcessingMode.EXACTLY_ONCE_BETA, streamsProducer, singletonMap, singletonMap2);
    }

    private void shouldCommitViaProducerIfEosEnabled(StreamThread.ProcessingMode processingMode, StreamsProducer streamsProducer, Map<TopicPartition, OffsetAndMetadata> map, Map<TopicPartition, OffsetAndMetadata> map2) {
        setUpTaskManager(processingMode);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(map);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask2.setCommittableOffsetsAndMetadata(map2);
        stateMachineTask2.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId02, stateMachineTask2);
        EasyMock.reset(new Object[]{this.consumer});
        EasyMock.expect(this.consumer.groupMetadata()).andReturn(new ConsumerGroupMetadata("appId")).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, streamsProducer});
        this.taskManager.commitAll();
        EasyMock.verify(new Object[]{streamsProducer, this.consumer});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.19
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("opsh."));
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.20
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("opsh."));
    }

    @Test
    public void shouldSendPurgeData() {
        EasyMock.resetToStrict(new Object[]{this.adminClient});
        EasyMock.expect(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L)))).andReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, completedFuture())));
        EasyMock.expect(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(17L)))).andReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, completedFuture())));
        EasyMock.replay(new Object[]{this.adminClient});
        final HashMap hashMap = new HashMap();
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.21
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, Long> purgeableOffsets() {
                return hashMap;
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        hashMap.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        hashMap.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        EasyMock.resetToStrict(new Object[]{this.adminClient});
        EasyMock.expect(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L)))).andReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, new KafkaFutureImpl())));
        EasyMock.replay(new Object[]{this.adminClient});
        final HashMap hashMap = new HashMap();
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.22
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, Long> purgeableOffsets() {
                return hashMap;
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        hashMap.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        hashMap.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, kafkaFutureImpl));
        kafkaFutureImpl.completeExceptionally(new Exception("KABOOM!"));
        EasyMock.expect(this.adminClient.deleteRecords((Map) EasyMock.anyObject())).andReturn(deleteRecordsResult).times(2);
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.adminClient, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setPurgeableOffsets(Collections.singletonMap(this.t1p1, 5L));
        this.taskManager.maybePurgeCommittedRecords();
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true);
        StateMachineTask stateMachineTask5 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions), Utils.mkEntry(this.taskId03, this.taskId03Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(mkMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3, stateMachineTask4)).anyTimes();
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(mkMap2))).andReturn(Collections.singletonList(stateMachineTask5)).anyTimes();
        this.consumer.commitSync(hashMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask5.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask.setCommitRequested();
        stateMachineTask2.setCommitNeeded();
        stateMachineTask3.setCommitRequested();
        stateMachineTask4.setCommitNeeded();
        stateMachineTask4.setCommitRequested();
        stateMachineTask5.setCommitNeeded();
        stateMachineTask5.setCommitRequested();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasksPerUserRequested()), IsEqual.equalTo(3));
    }

    @Test
    public void shouldProcessActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap hashMap = new HashMap();
        hashMap.put(this.taskId00, this.taskId00Partitions);
        hashMap.put(this.taskId01, this.taskId01Partitions);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(hashMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.addRecords(this.t1p0, Arrays.asList(getConsumerRecord(this.t1p0, 0L), getConsumerRecord(this.t1p0, 1L), getConsumerRecord(this.t1p0, 2L), getConsumerRecord(this.t1p0, 3L), getConsumerRecord(this.t1p0, 4L), getConsumerRecord(this.t1p0, 5L)));
        stateMachineTask2.addRecords(this.t1p1, Arrays.asList(getConsumerRecord(this.t1p1, 0L), getConsumerRecord(this.t1p1, 1L), getConsumerRecord(this.t1p1, 2L), getConsumerRecord(this.t1p1, 3L), getConsumerRecord(this.t1p1, 4L)));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(6));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(5));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(0));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.23
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public boolean process(long j) {
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        TopicPartition next = this.taskId00Partitions.iterator().next();
        stateMachineTask.addRecords(next, Collections.singletonList(getConsumerRecord(next, 0L)));
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.process(1, this.time);
        });
    }

    @Test
    public void shouldPropagateRuntimeExceptionsInProcessActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.24
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public boolean process(long j) {
                throw new RuntimeException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        TopicPartition next = this.taskId00Partitions.iterator().next();
        stateMachineTask.addRecords(next, Collections.singletonList(getConsumerRecord(next, 0L)));
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.process(1, this.time);
        })).getMessage(), Matchers.is("oops"));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.25
            public boolean maybePunctuateStreamTime() {
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.punctuate();
        });
    }

    @Test
    public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.26
            public boolean maybePunctuateStreamTime() {
                throw new KafkaException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(KafkaException.class, () -> {
            this.taskManager.punctuate();
        });
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.27
            public boolean maybePunctuateStreamTime() {
                return true;
            }

            public boolean maybePunctuateSystemTime() {
                return true;
            }
        };
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.punctuate()), IsEqual.equalTo(2));
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.28
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(new TopicPartition("fake", 0));
            }
        };
        EasyMock.expect(this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask)).anyTimes();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.changeLogReader, this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldHaveRemainingPartitionsUncleared() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(TaskManager.class);
        Throwable th = null;
        try {
            try {
                this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
                MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
                MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
                this.taskManager.handleRevocation(Utils.mkSet(new TopicPartition[]{this.t1p0, new TopicPartition("unknown", 0)}));
                MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("taskManagerTestThe following partitions [unknown-0] are missing from the task partitions. It could potentially due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback."));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.29
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.30
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t2 close exception", new RuntimeException());
            }
        };
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.taskManager.tasks().put(this.taskId02, stateMachineTask2);
        MatcherAssert.assertThat(Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        }).getMessage(), IsEqual.equalTo("t1 close exception; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.31
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.32
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
            }
        };
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.taskManager.tasks().put(this.taskId02, stateMachineTask2);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(runtimeException.getMessage(), IsEqual.equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. First unexpected exception (for task 0_2) follows."));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), IsEqual.equalTo("t2 illegal state exception"));
    }

    @Test
    public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.33
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.34
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new KafkaException("Kaboom for t2!", new RuntimeException());
            }
        };
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.taskManager.tasks().put(this.taskId02, stateMachineTask2);
        KafkaException assertThrows = Assert.assertThrows(KafkaException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Kaboom for t2!"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), IsEqual.equalTo((Object) null));
    }

    @Test
    public void shouldTransmitProducerMetrics() {
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        Map singletonMap = Collections.singletonMap(metricName, new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime()));
        EasyMock.expect(this.activeTaskCreator.producerMetrics()).andReturn(singletonMap);
        EasyMock.replay(new Object[]{this.activeTaskCreator});
        MatcherAssert.assertThat(this.taskManager.producerMetrics(), Matchers.is(singletonMap));
    }

    private Map<TaskId, StateMachineTask> handleAssignment(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, Map<TaskId, Set<TopicPartition>> map3) {
        Set<Task> set = (Set) map.entrySet().stream().map(entry -> {
            return new StateMachineTask((TaskId) entry.getKey(), (Set) entry.getValue(), true);
        }).collect(Collectors.toSet());
        Set<Task> set2 = (Set) map2.entrySet().stream().map(entry2 -> {
            return new StateMachineTask((TaskId) entry2.getKey(), (Set) entry2.getValue(), false);
        }).collect(Collectors.toSet());
        Set<Task> set3 = (Set) map3.entrySet().stream().map(entry3 -> {
            return new StateMachineTask((TaskId) entry3.getKey(), (Set) entry3.getValue(), true);
        }).collect(Collectors.toSet());
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(map3);
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set3);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(map))).andStubReturn(set);
        EasyMock.expect(this.standbyTaskCreator.createTasks((Map) EasyMock.eq(map2))).andStubReturn(set2);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(hashMap))).andStubReturn(hashSet);
        expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(map, map2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration()), Matchers.is(true));
        this.taskManager.handleAssignment(hashMap, map2);
        HashMap hashMap2 = new HashMap();
        for (Task task : set) {
            MatcherAssert.assertThat(task.state(), Matchers.is(Task.State.RUNNING));
            hashMap2.put(task.id(), (StateMachineTask) task);
        }
        for (Task task2 : set3) {
            MatcherAssert.assertThat(task2.state(), Matchers.not(Task.State.RUNNING));
            hashMap2.put(task2.id(), (StateMachineTask) task2);
        }
        for (Task task3 : set2) {
            MatcherAssert.assertThat(task3.state(), Matchers.is(Task.State.RUNNING));
            hashMap2.put(task3.id(), (StateMachineTask) task3);
        }
        return hashMap2;
    }

    private void expectLockObtainedFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(taskId))).andReturn(true).once();
        }
    }

    private void expectLockFailedFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(taskId))).andReturn(false).once();
        }
    }

    private void expectUnlockFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            this.stateDirectory.unlock(taskId);
            EasyMock.expectLastCall();
        }
    }

    private static void expectConsumerAssignmentPaused(Consumer<byte[], byte[]> consumer) {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(consumer.assignment()).andReturn(singleton);
        consumer.pause(singleton);
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnCommitFailed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new CommitFailedException());
        EasyMock.replay(new Object[]{this.consumer});
        TaskMigratedException assertThrows = Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.commitAll();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(CommitFailedException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated."));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldThrowStreamsExceptionOnCommitTimeout() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        EasyMock.replay(new Object[]{this.consumer});
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.taskManager.commitAll();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Timed out while committing offsets via consumer"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldStreamsExceptionOnCommitError() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new KafkaException());
        EasyMock.replay(new Object[]{this.consumer});
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.taskManager.commitAll();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered committing offsets via consumer"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldFailOnCommitFatal() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.tasks().put(this.taskId01, stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM"));
        EasyMock.replay(new Object[]{this.consumer});
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("KABOOM"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.35
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(hashMap))).andReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(Arrays.asList(this.t1p0, this.t1p1));
        })).getCause().getMessage(), Matchers.is("KABOOM!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.SUSPENDED));
    }

    private static void expectRestoreToBeCompleted(Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader) {
        expectRestoreToBeCompleted(consumer, changelogReader, true);
    }

    private static void expectRestoreToBeCompleted(Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, boolean z) {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(consumer.assignment()).andReturn(singleton);
        consumer.resume(singleton);
        EasyMock.expectLastCall();
        EasyMock.expect(changelogReader.completedChangelogs()).andReturn(Collections.emptySet()).times(z ? 1 : 0, 1);
    }

    private static KafkaFutureImpl<DeletedRecords> completedFuture() {
        KafkaFutureImpl<DeletedRecords> kafkaFutureImpl = new KafkaFutureImpl<>();
        kafkaFutureImpl.complete((Object) null);
        return kafkaFutureImpl;
    }

    private void makeTaskFolders(String... strArr) throws Exception {
        File[] fileArr = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            fileArr[i] = this.testFolder.newFolder(strArr[i]);
        }
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(fileArr).once();
    }

    private void writeCheckpointFile(TaskId taskId, Map<TopicPartition, Long> map) throws Exception {
        File checkpointFile = getCheckpointFile(taskId);
        MatcherAssert.assertThat(Boolean.valueOf(checkpointFile.createNewFile()), Matchers.is(true));
        new OffsetCheckpoint(checkpointFile).write(map);
        EasyMock.expect(this.stateDirectory.checkpointFileFor(taskId)).andReturn(checkpointFile);
    }

    private File getCheckpointFile(TaskId taskId) {
        return new File(new File(this.testFolder.getRoot(), taskId.toString()), ".checkpoint");
    }

    private static ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long j) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, (Object) null, (Object) null);
    }
}
