package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.class */
class DefaultStateUpdaterTest {
    private static final int COMMIT_INTERVAL = 100;
    private static final long CALL_TIMEOUT = 1000;
    private static final long VERIFICATION_TIMEOUT = 15000;
    private static final TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
    private static final TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
    private static final TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
    private static final TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0);
    private static final TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0);
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);
    private static final TaskId TASK_1_0 = new TaskId(1, 0);
    private static final TaskId TASK_1_1 = new TaskId(1, 1);
    private final Time time = new MockTime(1);
    private final StreamsConfig config = new StreamsConfig(configProps());
    private final ChangelogReader changelogReader = (ChangelogReader) Mockito.mock(ChangelogReader.class);
    private final Consumer<Set<TopicPartition>> offsetResetter = set -> {
    };
    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(this.config, this.changelogReader, this.time);

    DefaultStateUpdaterTest() {
    }

    @AfterEach
    public void tearDown() {
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
    }

    private Properties configProps() {
        return Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("processing.guarantee", "exactly_once_v2"), Utils.mkEntry("commit.interval.ms", Integer.valueOf(COMMIT_INTERVAL)), Utils.mkEntry(StreamsConfig.producerPrefix("transaction.timeout.ms"), Integer.valueOf(COMMIT_INTERVAL))}));
    }

    @Test
    public void shouldShutdownStateUpdater() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader) Mockito.verify(this.changelogReader)).clear();
    }

    @Test
    public void shouldShutdownStateUpdaterAndRestart() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(2))).clear();
    }

    @Test
    public void shouldThrowIfStatelessTaskNotInStateRestoring() {
        shouldThrowIfActiveTaskNotInStateRestoring(createStatelessTask(TASK_0_0));
    }

    @Test
    public void shouldThrowIfStatefulTaskNotInStateRestoring() {
        shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    private void shouldThrowIfActiveTaskNotInStateRestoring(StreamTask streamTask) {
        shouldThrowIfTaskNotInGivenState(streamTask, Task.State.RESTORING);
    }

    @Test
    public void shouldThrowIfStandbyTaskNotInStateRunning() {
        shouldThrowIfTaskNotInGivenState(createStandbyTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)), Task.State.RUNNING);
    }

    private void shouldThrowIfTaskNotInGivenState(Task task, Task.State state) {
        for (Task.State state2 : Task.State.values()) {
            if (state2 != state) {
                Mockito.when(task.state()).thenReturn(state2);
                Assertions.assertThrows(IllegalStateException.class, () -> {
                    this.stateUpdater.add(task);
                });
            }
        }
    }

    @Test
    public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
        shouldImmediatelyAddStatelessTasksToRestoredTasks(createStatelessTaskInStateRestoring(TASK_0_0));
    }

    @Test
    public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
        shouldImmediatelyAddStatelessTasksToRestoredTasks(createStatelessTaskInStateRestoring(TASK_0_0), createStatelessTaskInStateRestoring(TASK_0_2), createStatelessTaskInStateRestoring(TASK_1_0));
    }

    private void shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamTask... streamTaskArr) throws Exception {
        this.stateUpdater.start();
        for (StreamTask streamTask : streamTaskArr) {
            this.stateUpdater.add(streamTask);
        }
        verifyRestoredActiveTasks(streamTaskArr);
        verifyNeverCheckpointTasks(streamTaskArr);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldRestoreSingleActiveStatefulTask() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false).thenReturn(false).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring);
        verifyCheckpointTasks(true, createActiveStatefulTaskInStateRestoring);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyRemovedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(3))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamTask createActiveStatefulTaskInStateRestoring3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_C_0})).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0})).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring3);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring3, createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2);
        verifyCheckpointTasks(true, createActiveStatefulTaskInStateRestoring3, createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyRemovedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(3))).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(4))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldDrainRestoredActiveTasks() throws Exception {
        Assertions.assertTrue(this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
        StreamTask createStatelessTaskInStateRestoring = createStatelessTaskInStateRestoring(TASK_0_0);
        this.stateUpdater.start();
        this.stateUpdater.add(createStatelessTaskInStateRestoring);
        verifyDrainingRestoredActiveTasks(createStatelessTaskInStateRestoring);
        StreamTask createStatelessTaskInStateRestoring2 = createStatelessTaskInStateRestoring(TASK_1_1);
        StreamTask createStatelessTaskInStateRestoring3 = createStatelessTaskInStateRestoring(TASK_1_0);
        StreamTask createStatelessTaskInStateRestoring4 = createStatelessTaskInStateRestoring(TASK_0_2);
        this.stateUpdater.add(createStatelessTaskInStateRestoring2);
        this.stateUpdater.add(createStatelessTaskInStateRestoring3);
        this.stateUpdater.add(createStatelessTaskInStateRestoring4);
        verifyDrainingRestoredActiveTasks(createStatelessTaskInStateRestoring2, createStatelessTaskInStateRestoring3, createStatelessTaskInStateRestoring4);
    }

    @Test
    public void shouldUpdateSingleStandbyTask() throws Exception {
        shouldUpdateStandbyTasks(createStandbyTaskInStateRunning(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)));
    }

    @Test
    public void shouldUpdateMultipleStandbyTasks() throws Exception {
        shouldUpdateStandbyTasks(createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)), createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)), createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)));
    }

    private void shouldUpdateStandbyTasks(StandbyTask... standbyTaskArr) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        for (StandbyTask standbyTask : standbyTaskArr) {
            this.stateUpdater.add(standbyTask);
        }
        verifyUpdatingStandbyTasks(standbyTaskArr);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyRemovedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(1))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).enforceRestoreActive();
    }

    @Test
    public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring2, createActiveStatefulTaskInStateRestoring);
        verifyCheckpointTasks(true, createActiveStatefulTaskInStateRestoring2, createActiveStatefulTaskInStateRestoring);
        verifyUpdatingStandbyTasks(createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyRemovedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(3))).restore(ArgumentMatchers.anyMap());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader, createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(2))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_B_0}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring);
        verifyCheckpointTasks(true, createActiveStatefulTaskInStateRestoring);
        verifyUpdatingStandbyTasks(createStandbyTaskInStateRunning);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2);
        verifyCheckpointTasks(true, createActiveStatefulTaskInStateRestoring2);
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        Task createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet(new TaskId[]{createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring2.id()}));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).doNothing().when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createActiveStatefulTaskInStateRestoring2.id(), createActiveStatefulTaskInStateRestoring2), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2}), taskCorruptedException));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.atLeast(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyUpdatingTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning);
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring.id());
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring2.id());
        verifyRemovedTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.atLeast(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldRemoveActiveStatefulTask() throws Exception {
        shouldRemoveStatefulTask(createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    @Test
    public void shouldRemoveStandbyTask() throws Exception {
        shouldRemoveStatefulTask(createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    private void shouldRemoveStatefulTask(Task task) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.remove(task.id());
        verifyRemovedTasks(task);
        verifyCheckpointTasks(true, task);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(task.changelogPartitions());
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
        shouldNotRemoveTaskFromRestoredActiveTasks(createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    @Test
    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
        shouldNotRemoveTaskFromRestoredActiveTasks(createStatelessTaskInStateRestoring(TASK_0_0));
    }

    private void shouldNotRemoveTaskFromRestoredActiveTasks(StreamTask streamTask) throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(streamTask);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        verifyRestoredActiveTasks(streamTask);
        this.stateUpdater.remove(streamTask.id());
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring.id());
        verifyRemovedTasks(createActiveStatefulTaskInStateRestoring);
        verifyRestoredActiveTasks(streamTask);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
        shouldNotRemoveTaskFromFailedTasks(createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    @Test
    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
        shouldNotRemoveTaskFromFailedTasks(createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void shouldNotRemoveTaskFromFailedTasks(Task task) throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).doNothing().when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task), Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring)}));
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{task}), streamsException);
        verifyExceptionsAndFailedTasks(exceptionAndTasks);
        this.stateUpdater.remove(task.id());
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring.id());
        verifyRemovedTasks(createActiveStatefulTaskInStateRestoring);
        verifyExceptionsAndFailedTasks(exceptionAndTasks);
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldDrainRemovedTasks() throws Exception {
        Assertions.assertTrue(this.stateUpdater.drainRemovedTasks().isEmpty());
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring.id());
        verifyDrainingRemovedTasks(createActiveStatefulTaskInStateRestoring);
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamTask createActiveStatefulTaskInStateRestoring3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring2.id());
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring3);
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring3.id());
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring4);
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring4.id());
        verifyDrainingRemovedTasks(createActiveStatefulTaskInStateRestoring2, createActiveStatefulTaskInStateRestoring3, createActiveStatefulTaskInStateRestoring4);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        Task createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamsException streamsException = new StreamsException("The Streams were crossed!");
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)}));
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring, createStandbyTaskInStateRunning}), streamsException));
        verifyRemovedTasks(new Task[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        Task createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring.id());
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", createStandbyTaskInStateRunning.id());
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createActiveStatefulTaskInStateRestoring2.id(), createActiveStatefulTaskInStateRestoring2), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring2.id(), createActiveStatefulTaskInStateRestoring2), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)});
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(mkMap);
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{streamsException2}).when(this.changelogReader)).restore(mkMap2);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring}), streamsException), new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createStandbyTaskInStateRunning}), streamsException2));
        verifyUpdatingTasks(createActiveStatefulTaskInStateRestoring2);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyRemovedTasks(new Task[0]);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        Task createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet(new TaskId[]{createActiveStatefulTaskInStateRestoring.id(), createStandbyTaskInStateRunning.id()}));
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{taskCorruptedException}).doNothing().when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning), Utils.mkEntry(createActiveStatefulTaskInStateRestoring2.id(), createActiveStatefulTaskInStateRestoring2)}));
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring, createStandbyTaskInStateRunning}), taskCorruptedException));
        verifyUpdatingTasks(createActiveStatefulTaskInStateRestoring2);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        Task createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{illegalStateException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)}));
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring, createStandbyTaskInStateRunning}), illegalStateException));
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyRemovedTasks(new Task[0]);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldDrainFailedTasksAndExceptions() throws Exception {
        Assertions.assertTrue(this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
        Task createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        Task createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
        Task createActiveStatefulTaskInStateRestoring3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        Task createActiveStatefulTaskInStateRestoring4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring.id());
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring)}));
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring2.id());
        StreamsException streamsException3 = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring3.id());
        StreamsException streamsException4 = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring4.id());
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException2}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring2.id(), createActiveStatefulTaskInStateRestoring2), Utils.mkEntry(createActiveStatefulTaskInStateRestoring3.id(), createActiveStatefulTaskInStateRestoring3), Utils.mkEntry(createActiveStatefulTaskInStateRestoring4.id(), createActiveStatefulTaskInStateRestoring4)}));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException3}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring3.id(), createActiveStatefulTaskInStateRestoring3), Utils.mkEntry(createActiveStatefulTaskInStateRestoring4.id(), createActiveStatefulTaskInStateRestoring4)}));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException4}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring4.id(), createActiveStatefulTaskInStateRestoring4)}));
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        verifyDrainingExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring}), streamsException));
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring3);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring4);
        verifyDrainingExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring2}), streamsException2), new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring3}), streamsException3), new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring4}), streamsException4));
    }

    @Test
    public void shouldAutoCheckpointTasksOnInterval() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        verifyUpdatingTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2);
        this.time.sleep(101L);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        verifyCheckpointTasks(false, createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2);
    }

    @Test
    public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
        MockTime mockTime = new MockTime();
        DefaultStateUpdater defaultStateUpdater = new DefaultStateUpdater(this.config, this.changelogReader, mockTime);
        try {
            StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
            StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
            StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
            StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
            Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
            Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
            defaultStateUpdater.start();
            defaultStateUpdater.add(createActiveStatefulTaskInStateRestoring);
            defaultStateUpdater.add(createActiveStatefulTaskInStateRestoring2);
            defaultStateUpdater.add(createStandbyTaskInStateRunning);
            defaultStateUpdater.add(createStandbyTaskInStateRunning2);
            mockTime.sleep(100L);
            verifyNeverCheckpointTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2);
            defaultStateUpdater.shutdown(Duration.ofMinutes(1L));
        } catch (Throwable th) {
            defaultStateUpdater.shutdown(Duration.ofMinutes(1L));
            throw th;
        }
    }

    private void verifyCheckpointTasks(boolean z, Task... taskArr) {
        for (Task task : taskArr) {
            ((Task) Mockito.verify(task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(1))).maybeCheckpoint(z);
        }
    }

    private void verifyNeverCheckpointTasks(Task... taskArr) {
        for (Task task : taskArr) {
            ((Task) Mockito.verify(task, Mockito.never())).maybeCheckpoint(EasyMock.anyBoolean());
        }
    }

    @Test
    public void shouldGetTasksFromInputQueue() {
        this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask createStandbyTaskInStateRunning3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        this.stateUpdater.remove(TASK_0_0);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning3);
        Set tasks = this.stateUpdater.getTasks();
        Assertions.assertEquals(5, tasks.size());
        Assertions.assertTrue(tasks.containsAll(Utils.mkSet(new AbstractTask[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning3})));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals(2, activeTasks.size());
        Assertions.assertTrue(activeTasks.containsAll(Utils.mkSet(new StreamTask[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2})));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals(3, standbyTasks.size());
        Assertions.assertTrue(standbyTasks.containsAll(Utils.mkSet(new StandbyTask[]{createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning3})));
    }

    @Test
    public void shouldGetTasksFromUpdatingTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask createStandbyTaskInStateRunning3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        this.stateUpdater.add(createStandbyTaskInStateRunning3);
        verifyUpdatingTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning3);
        Set tasks = this.stateUpdater.getTasks();
        Assertions.assertEquals(5, tasks.size());
        Assertions.assertTrue(tasks.containsAll(Utils.mkSet(new AbstractTask[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2, createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning3})));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals(2, activeTasks.size());
        Assertions.assertTrue(activeTasks.containsAll(Utils.mkSet(new StreamTask[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2})));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals(3, standbyTasks.size());
        Assertions.assertTrue(standbyTasks.containsAll(Utils.mkSet(new StandbyTask[]{createStandbyTaskInStateRunning, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning3})));
    }

    @Test
    public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask createActiveStatefulTaskInStateRestoring2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Utils.mkSet(new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring2);
        verifyRestoredActiveTasks(createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2);
        verifyGetTasks(Utils.mkSet(new StreamTask[]{createActiveStatefulTaskInStateRestoring, createActiveStatefulTaskInStateRestoring2}), Utils.mkSet(new StandbyTask[0]));
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMinutes(1L));
        verifyGetTasks(Utils.mkSet(new StreamTask[0]), Utils.mkSet(new StandbyTask[0]));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet(new TaskId[]{createStandbyTaskInStateRunning2.id(), createStandbyTaskInStateRunning.id()}));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", createActiveStatefulTaskInStateRestoring.id());
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{taskCorruptedException}).doNothing().when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring), Utils.mkEntry(createStandbyTaskInStateRunning2.id(), createStandbyTaskInStateRunning2), Utils.mkEntry(createStandbyTaskInStateRunning.id(), createStandbyTaskInStateRunning)}));
        ((ChangelogReader) Mockito.doNothing().doThrow(new Throwable[]{streamsException}).doNothing().when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(createActiveStatefulTaskInStateRestoring.id(), createActiveStatefulTaskInStateRestoring)}));
        this.stateUpdater.start();
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning}), taskCorruptedException), new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{createActiveStatefulTaskInStateRestoring}), streamsException));
        verifyGetTasks(Utils.mkSet(new StreamTask[]{createActiveStatefulTaskInStateRestoring}), Utils.mkSet(new StandbyTask[]{createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning}));
        this.stateUpdater.drainExceptionsAndFailedTasks();
        verifyGetTasks(Utils.mkSet(new StreamTask[0]), Utils.mkSet(new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromRemovedTasks() throws Exception {
        StreamTask createActiveStatefulTaskInStateRestoring = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask createStandbyTaskInStateRunning = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask createStandbyTaskInStateRunning2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(createStandbyTaskInStateRunning2);
        this.stateUpdater.add(createActiveStatefulTaskInStateRestoring);
        this.stateUpdater.add(createStandbyTaskInStateRunning);
        this.stateUpdater.remove(createStandbyTaskInStateRunning2.id());
        this.stateUpdater.remove(createStandbyTaskInStateRunning.id());
        this.stateUpdater.remove(createActiveStatefulTaskInStateRestoring.id());
        verifyRemovedTasks(createActiveStatefulTaskInStateRestoring, createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning);
        verifyGetTasks(Utils.mkSet(new StreamTask[]{createActiveStatefulTaskInStateRestoring}), Utils.mkSet(new StandbyTask[]{createStandbyTaskInStateRunning2, createStandbyTaskInStateRunning}));
        this.stateUpdater.drainRemovedTasks();
        verifyGetTasks(Utils.mkSet(new StreamTask[0]), Utils.mkSet(new StandbyTask[0]));
    }

    private void verifyGetTasks(Set<StreamTask> set, Set<StandbyTask> set2) {
        Set tasks = this.stateUpdater.getTasks();
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set2);
        Assertions.assertEquals(set.size() + set2.size(), tasks.size());
        Assertions.assertTrue(tasks.containsAll(hashSet));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals(set.size(), activeTasks.size());
        Assertions.assertTrue(activeTasks.containsAll(set));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals(set2.size(), standbyTasks.size());
        Assertions.assertTrue(standbyTasks.containsAll(set2));
    }

    private void verifyRestoredActiveTasks(StreamTask... streamTaskArr) throws Exception {
        if (streamTaskArr.length == 0) {
            Assertions.assertTrue(this.stateUpdater.getRestoredActiveTasks().isEmpty());
            return;
        }
        Set mkSet = Utils.mkSet(streamTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.getRestoredActiveTasks());
            return hashSet.containsAll(mkSet) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!");
    }

    private void verifyDrainingRestoredActiveTasks(StreamTask... streamTaskArr) throws Exception {
        Set mkSet = Utils.mkSet(streamTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
            return hashSet.containsAll(mkSet) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!");
        Assertions.assertTrue(this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
    }

    private void verifyUpdatingTasks(Task... taskArr) throws Exception {
        if (taskArr.length == 0) {
            Assertions.assertTrue(this.stateUpdater.getUpdatingTasks().isEmpty());
            return;
        }
        Set mkSet = Utils.mkSet(taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.getUpdatingTasks());
            return hashSet.containsAll(mkSet) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not get all updating task within the given timeout!");
    }

    private void verifyUpdatingStandbyTasks(StandbyTask... standbyTaskArr) throws Exception {
        Set mkSet = Utils.mkSet(standbyTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.getUpdatingStandbyTasks());
            return hashSet.containsAll(mkSet) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not see all standby task within the given timeout!");
    }

    private void verifyRemovedTasks(Task... taskArr) throws Exception {
        if (taskArr.length == 0) {
            Assertions.assertTrue(this.stateUpdater.getRemovedTasks().isEmpty());
            return;
        }
        Set mkSet = Utils.mkSet(taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.getRemovedTasks());
            return hashSet.containsAll(mkSet) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not get all removed task within the given timeout!");
    }

    private void verifyDrainingRemovedTasks(Task... taskArr) throws Exception {
        Set mkSet = Utils.mkSet(taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.drainRemovedTasks());
            return hashSet.containsAll(Utils.mkSet(taskArr)) && hashSet.size() == mkSet.size();
        }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!");
        Assertions.assertTrue(this.stateUpdater.drainRemovedTasks().isEmpty());
    }

    private void verifyExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks... exceptionAndTasksArr) throws Exception {
        List asList = Arrays.asList(exceptionAndTasksArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.getExceptionsAndFailedTasks());
            return hashSet.containsAll(asList) && hashSet.size() == asList.size();
        }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyDrainingExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks... exceptionAndTasksArr) throws Exception {
        List asList = Arrays.asList(exceptionAndTasksArr);
        ArrayList arrayList = new ArrayList();
        TestUtils.waitForCondition(() -> {
            arrayList.addAll(this.stateUpdater.drainExceptionsAndFailedTasks());
            return arrayList.containsAll(asList) && arrayList.size() == asList.size();
        }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!");
        Assertions.assertTrue(this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
    }

    private StreamTask createActiveStatefulTaskInStateRestoring(TaskId taskId, Collection<TopicPartition> collection) {
        StreamTask createActiveStatefulTask = createActiveStatefulTask(taskId, collection);
        Mockito.when(createActiveStatefulTask.state()).thenReturn(Task.State.RESTORING);
        return createActiveStatefulTask;
    }

    private StreamTask createActiveStatefulTask(TaskId taskId, Collection<TopicPartition> collection) {
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        setupStatefulTask(streamTask, taskId, collection);
        Mockito.when(Boolean.valueOf(streamTask.isActive())).thenReturn(true);
        return streamTask;
    }

    private StreamTask createStatelessTaskInStateRestoring(TaskId taskId) {
        StreamTask createStatelessTask = createStatelessTask(taskId);
        Mockito.when(createStatelessTask.state()).thenReturn(Task.State.RESTORING);
        return createStatelessTask;
    }

    private StreamTask createStatelessTask(TaskId taskId) {
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.changelogPartitions()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(streamTask.isActive())).thenReturn(true);
        Mockito.when(streamTask.id()).thenReturn(taskId);
        return streamTask;
    }

    private StandbyTask createStandbyTaskInStateRunning(TaskId taskId, Collection<TopicPartition> collection) {
        StandbyTask createStandbyTask = createStandbyTask(taskId, collection);
        Mockito.when(createStandbyTask.state()).thenReturn(Task.State.RUNNING);
        return createStandbyTask;
    }

    private StandbyTask createStandbyTask(TaskId taskId, Collection<TopicPartition> collection) {
        StandbyTask standbyTask = (StandbyTask) Mockito.mock(StandbyTask.class);
        setupStatefulTask(standbyTask, taskId, collection);
        Mockito.when(Boolean.valueOf(standbyTask.isActive())).thenReturn(false);
        return standbyTask;
    }

    private void setupStatefulTask(Task task, TaskId taskId, Collection<TopicPartition> collection) {
        Mockito.when(task.changelogPartitions()).thenReturn(collection);
        Mockito.when(task.id()).thenReturn(taskId);
    }
}
