package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest.class */
public class TaskManagerTest {
    private final TaskId taskId0 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("t1", 0);
    private final Set<TopicPartition> taskId0Partitions = Utils.mkSet(new TopicPartition[]{this.t1p0});
    private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(this.taskId0, this.taskId0Partitions);

    @Mock(type = MockType.NICE)
    private ChangelogReader changeLogReader;

    @Mock(type = MockType.NICE)
    private Consumer<byte[], byte[]> restoreConsumer;

    @Mock(type = MockType.NICE)
    private Consumer<byte[], byte[]> consumer;

    @Mock(type = MockType.NICE)
    private StreamThread.AbstractTaskCreator activeTaskCreator;

    @Mock(type = MockType.NICE)
    private StreamThread.AbstractTaskCreator standbyTaskCreator;

    @Mock(type = MockType.NICE)
    private ThreadMetadataProvider threadMetadataProvider;

    @Mock(type = MockType.NICE)
    private Task firstTask;

    @Mock(type = MockType.NICE)
    private AssignedTasks active;

    @Mock(type = MockType.NICE)
    private AssignedTasks standby;
    private TaskManager taskManager;

    @Before
    public void setUp() throws Exception {
        this.taskManager = new TaskManager(this.changeLogReader, "", this.restoreConsumer, this.activeTaskCreator, this.standbyTaskCreator, this.active, this.standby);
        this.taskManager.setThreadMetadataProvider(this.threadMetadataProvider);
        this.taskManager.setConsumer(this.consumer);
    }

    private void replay() {
        EasyMock.replay(new Object[]{this.changeLogReader, this.restoreConsumer, this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.threadMetadataProvider, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        mockSingleActiveTask();
        this.active.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        mockSingleActiveTask();
        this.standby.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldResetChangeLogReaderOnCreateTasks() {
        mockSingleActiveTask();
        this.changeLogReader.reset();
        EasyMock.expectLastCall();
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.changeLogReader});
    }

    @Test
    public void shouldAddNonResumedActiveTasks() {
        mockSingleActiveTask();
        EasyMock.expect(Boolean.valueOf(this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(false);
        this.active.addNewTask((Task) EasyMock.same(this.firstTask));
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedActiveTasks() {
        EasyMock.checkOrder(this.active, true);
        mockThreadMetadataProvider(Collections.emptyMap(), this.taskId0Assignment);
        EasyMock.expect(Boolean.valueOf(this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(true);
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active, this.activeTaskCreator});
    }

    @Test
    public void shouldAddNonResumedStandbyTasks() {
        mockStandbyTaskExpectations();
        EasyMock.expect(Boolean.valueOf(this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(false);
        this.standby.addNewTask((Task) EasyMock.same(this.firstTask));
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.standbyTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedStandbyTasks() {
        EasyMock.checkOrder(this.active, true);
        mockThreadMetadataProvider(this.taskId0Assignment, Collections.emptyMap());
        EasyMock.expect(Boolean.valueOf(this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(true);
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.standby, this.standbyTaskCreator});
    }

    @Test
    public void shouldPauseActiveUninitializedPartitions() {
        mockSingleActiveTask();
        EasyMock.expect(this.active.uninitializedPartitions()).andReturn(this.taskId0Partitions);
        this.consumer.pause(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldSuspendActiveTasks() {
        EasyMock.expect(this.active.suspend()).andReturn((Object) null);
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldSuspendStandbyTasks() {
        EasyMock.expect(this.standby.suspend()).andReturn((Object) null);
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnSuspend() {
        this.restoreConsumer.assign(Collections.emptyList());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
        EasyMock.expect(this.active.suspend()).andReturn(new RuntimeException(""));
        EasyMock.expect(this.standby.suspend()).andReturn(new RuntimeException(""));
        EasyMock.expectLastCall();
        this.restoreConsumer.assign(Collections.emptyList());
        replay();
        try {
            this.taskManager.suspendTasksAndState();
            Assert.fail("Should have thrown streams exception");
        } catch (StreamsException e) {
        }
        EasyMock.verify(new Object[]{this.restoreConsumer, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveTasksOnShutdown() {
        this.active.close(true);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(true);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        this.standby.close(false);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(false);
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnShutdown() {
        this.restoreConsumer.assign(Collections.emptyList());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(true);
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCloseThreadMetadataProviderOnShutdown() {
        this.threadMetadataProvider.close();
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(true);
        EasyMock.verify(new Object[]{this.threadMetadataProvider});
    }

    @Test
    public void shouldNotPropagateExceptionsOnShutdown() {
        this.threadMetadataProvider.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException());
        replay();
        this.taskManager.shutdown(false);
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(this.active.updateRestored((Collection) EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        EasyMock.expect(this.standby.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(this.active.updateRestored((Collection) EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldRestoreStateFromChangeLogReader() {
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(this.changeLogReader.restore(this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect(this.active.updateRestored(this.taskId0Partitions)).andReturn(Collections.emptySet());
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.changeLogReader, this.active});
    }

    @Test
    public void shouldResumeRestoredPartitions() {
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(this.changeLogReader.restore(this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect(this.active.updateRestored(this.taskId0Partitions)).andReturn(this.taskId0Partitions);
        this.consumer.resume(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldAssignStandbyPartitionsWhenAllActiveTasksAreRunning() {
        mockAssignStandbyPartitions(1L);
        replay();
        Assert.assertTrue(this.taskManager.updateNewAndRestoringTasks());
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(false);
        EasyMock.expect(this.active.updateRestored((Collection) EasyMock.anyObject())).andReturn(Collections.emptySet());
        replay();
        Assert.assertFalse(this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
        mockAssignStandbyPartitions(1L);
        this.restoreConsumer.seek(this.t1p0, 1L);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldSeekToBeginningIfOffsetIsLessThan0() {
        mockAssignStandbyPartitions(-1L);
        this.restoreConsumer.seekToBeginning(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        EasyMock.expect(Integer.valueOf(this.active.commit())).andReturn(1);
        EasyMock.expect(Integer.valueOf(this.standby.commit())).andReturn(2);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(3));
        EasyMock.verify(new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        EasyMock.checkOrder(this.standby, true);
        this.active.commit();
        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
        replay();
        try {
            this.taskManager.commitAll();
            Assert.fail("should have thrown first exception");
        } catch (Exception e) {
        }
        EasyMock.verify(new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        EasyMock.expect(Integer.valueOf(this.standby.commit())).andThrow(new RuntimeException(""));
        replay();
        try {
            this.taskManager.commitAll();
            Assert.fail("should have thrown exception");
        } catch (Exception e) {
        }
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldMaybeCommitActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.maybeCommit())).andReturn(5);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasks()), IsEqual.equalTo(5));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldProcessActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.process())).andReturn(10);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process()), IsEqual.equalTo(10));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.punctuate())).andReturn(20);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.punctuate()), IsEqual.equalTo(20));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldResumeConsumptionOfInitializedPartitions() {
        Set singleton = Collections.singleton(new TopicPartition("topic", 0));
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(singleton);
        EasyMock.expect(this.active.updateRestored((Collection) EasyMock.anyObject())).andReturn(Collections.emptySet());
        this.consumer.resume(singleton);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.active, this.consumer});
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.consumer});
    }

    private void mockAssignStandbyPartitions(long j) {
        Task task = (Task) EasyMock.createNiceMock(Task.class);
        EasyMock.expect(this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(true);
        EasyMock.expect(this.active.updateRestored((Collection) EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expect(this.standby.running()).andReturn(Collections.singletonList(task));
        EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(this.t1p0, Long.valueOf(j)));
        this.restoreConsumer.assign(this.taskId0Partitions);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{task});
    }

    private void mockStandbyTaskExpectations() {
        mockThreadMetadataProvider(this.taskId0Assignment, Collections.emptyMap());
        EasyMock.expect(this.standbyTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.firstTask));
    }

    private void mockSingleActiveTask() {
        mockThreadMetadataProvider(Collections.emptyMap(), this.taskId0Assignment);
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(Consumer.class), (Map) EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.firstTask));
    }

    private void mockThreadMetadataProvider(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        EasyMock.expect(this.threadMetadataProvider.standbyTasks()).andReturn(map).anyTimes();
        EasyMock.expect(this.threadMetadataProvider.activeTasks()).andReturn(map2).anyTimes();
    }
}
