package org.apache.james.task;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.task.Task;
import org.apache.james.task.TaskManagerWorker;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/task/SerialTaskManagerWorkerTest.class */
class SerialTaskManagerWorkerTest {
    private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofMillis(100);
    private TaskManagerWorker.Listener listener;
    private SerialTaskManagerWorker worker;
    private final Task successfulTask = new CompletedTask();
    private final Task failedTask = new FailedTask();
    private final Task throwingTask = new ThrowingTask();

    SerialTaskManagerWorkerTest() {
    }

    @BeforeEach
    void beforeEach() {
        this.listener = (TaskManagerWorker.Listener) Mockito.mock(TaskManagerWorker.Listener.class);
        Mockito.when(this.listener.started((TaskId) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.cancelled((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.completed((TaskId) ArgumentMatchers.any(), (Task.Result) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.updated((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.failed((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.failed((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any(), (Throwable) ArgumentMatchers.any())).thenReturn(Mono.empty());
        Mockito.when(this.listener.failed((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any())).thenReturn(Mono.empty());
        this.worker = new SerialTaskManagerWorker(this.listener, UPDATE_INFORMATION_POLLING_DURATION);
    }

    @Test
    void aSuccessfullTaskShouldCompleteSuccessfully() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Publisher.class);
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask);
        Assertions.assertThat((Task.Result) this.worker.executeTask(taskWithId).block()).isEqualTo(Task.Result.COMPLETED);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).completed((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Task.Result) ArgumentMatchers.eq(Task.Result.COMPLETED), (Publisher) forClass.capture());
        Assertions.assertThat((Optional) Mono.from((Publisher) forClass.getValue()).block()).isEmpty();
    }

    @Test
    void aRunningTaskShouldProvideInformationUpdatesDuringExecution() throws InterruptedException {
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask(atomicLong -> {
            Objects.requireNonNull(atomicLong);
            return (Task.Result) Mono.fromCallable(atomicLong::incrementAndGet).delayElement(Duration.ofMillis(200L)).repeat(10L).then(Mono.just(Task.Result.COMPLETED)).block();
        }));
        this.worker.executeTask(taskWithId).subscribe();
        TimeUnit.MILLISECONDS.sleep(200L);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).updated((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) ArgumentMatchers.notNull());
    }

    @Test
    void aRunningTaskShouldHaveAFiniteNumberOfInformation() {
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask(atomicLong -> {
            Objects.requireNonNull(atomicLong);
            return (Task.Result) Mono.fromCallable(atomicLong::incrementAndGet).delayElement(Duration.ofMillis(100L)).repeat(3L).then(Mono.just(Task.Result.COMPLETED)).block();
        }));
        this.worker.executeTask(taskWithId).block();
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atMost(4))).updated((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) ArgumentMatchers.notNull());
    }

    @Test
    void aRunningTaskShouldEmitAtMostOneInformationPerPeriod() {
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask(atomicLong -> {
            Objects.requireNonNull(atomicLong);
            return (Task.Result) Mono.fromCallable(atomicLong::incrementAndGet).delayElement(Duration.ofMillis(1L)).repeat(200L).then(Mono.just(Task.Result.COMPLETED)).block();
        }));
        this.worker.executeTask(taskWithId).block();
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atMost(3))).updated((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) ArgumentMatchers.notNull());
    }

    @Test
    void errorUponUpdatesShouldNotAbortTheRunningTaskPolledUpdates() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(this.listener.updated((TaskId) ArgumentMatchers.any(), (Publisher) ArgumentMatchers.any())).thenAnswer(new Answer<Mono<Void>>() { // from class: org.apache.james.task.SerialTaskManagerWorkerTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Mono<Void> m2answer(InvocationOnMock invocationOnMock) {
                return atomicInteger.getAndIncrement() == 1 ? Mono.error(new RuntimeException()) : Mono.empty();
            }
        });
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask(atomicLong -> {
            Objects.requireNonNull(atomicLong);
            return (Task.Result) Mono.fromCallable(atomicLong::incrementAndGet).delayElement(Duration.ofMillis(1L)).repeat(600L).then(Mono.just(Task.Result.COMPLETED)).block();
        }));
        this.worker.executeTask(taskWithId).block();
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeast(3))).updated((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) ArgumentMatchers.notNull());
    }

    @Test
    void aFailedTaskShouldCompleteWithFailedStatus() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Publisher.class);
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.failedTask);
        Assertions.assertThat((Task.Result) this.worker.executeTask(taskWithId).block()).isEqualTo(Task.Result.PARTIAL);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).failed((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) forClass.capture());
        Assertions.assertThat((Optional) Mono.from((Publisher) forClass.getValue()).block()).isEmpty();
    }

    @Test
    void aThrowingTaskShouldCompleteWithFailedStatus() {
        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.throwingTask);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Publisher.class);
        Assertions.assertThat((Task.Result) this.worker.executeTask(taskWithId).block()).isEqualTo(Task.Result.PARTIAL);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).failed((TaskId) ArgumentMatchers.eq(taskWithId.getId()), (Publisher) forClass.capture(), (Throwable) ArgumentMatchers.any(RuntimeException.class));
        Assertions.assertThat((Optional) Mono.from((Publisher) forClass.getValue()).block()).isEmpty();
    }

    @Test
    void theWorkerShouldReportThatATaskIsInProgress() throws InterruptedException {
        TaskId generateTaskId = TaskId.generateTaskId();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.worker.executeTask(new TaskWithId(generateTaskId, new MemoryReferenceTask(() -> {
            countDownLatch2.countDown();
            await(countDownLatch);
            return Task.Result.COMPLETED;
        }))).subscribe();
        await(countDownLatch2);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId);
        Mockito.verifyNoMoreInteractions(new Object[]{this.listener});
        countDownLatch.countDown();
    }

    @Test
    void taskExecutingReactivelyShouldStopExecutionUponCancel() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 100;
        MemoryReferenceTask memoryReferenceTask = new MemoryReferenceTask(() -> {
            return (Task.Result) Flux.interval(Duration.ofMillis(i)).flatMap(l -> {
                return Mono.fromCallable(() -> {
                    atomicInteger.incrementAndGet();
                    return Task.Result.COMPLETED;
                });
            }).reduce(Task::combine).thenReturn(Task.Result.COMPLETED).block();
        });
        TaskId generateTaskId = TaskId.generateTaskId();
        this.worker.executeTask(new TaskWithId(generateTaskId, memoryReferenceTask)).cache().subscribe();
        Awaitility.waitAtMost(Durations.TEN_SECONDS).untilAsserted(() -> {
            ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId);
        });
        this.worker.cancelTask(generateTaskId);
        Thread.sleep(100);
        int i2 = atomicInteger.get();
        Thread.sleep(2 * 100);
        Assertions.assertThat(i2).isEqualTo(atomicInteger.get());
    }

    @Test
    void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Publisher.class);
        TaskId generateTaskId = TaskId.generateTaskId();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono cache = this.worker.executeTask(new TaskWithId(generateTaskId, new MemoryReferenceTask(() -> {
            await(countDownLatch);
            atomicInteger.incrementAndGet();
            return Task.Result.COMPLETED;
        }))).cache();
        cache.subscribe();
        Awaitility.waitAtMost(Durations.TEN_SECONDS).untilAsserted(() -> {
            ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId);
        });
        this.worker.cancelTask(generateTaskId);
        cache.block(Duration.ofSeconds(10L));
        Thread.sleep(50L);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).cancelled((TaskId) ArgumentMatchers.eq(generateTaskId), (Publisher) forClass.capture());
        Mockito.verifyNoMoreInteractions(new Object[]{this.listener});
        Assertions.assertThat((Optional) Mono.from((Publisher) forClass.getValue()).block()).isEmpty();
    }

    @Test
    void theWorkerShouldCancelAnInProgressAsyncTask() throws InterruptedException {
        TaskId generateTaskId = TaskId.generateTaskId();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono cache = this.worker.executeTask(new TaskWithId(generateTaskId, new AsyncSafeTask() { // from class: org.apache.james.task.SerialTaskManagerWorkerTest.2
            /* renamed from: runAsync, reason: merged with bridge method [inline-methods] */
            public Mono<Task.Result> m3runAsync() {
                CountDownLatch countDownLatch2 = countDownLatch;
                return Mono.fromCallable(() -> {
                    SerialTaskManagerWorkerTest.this.await(countDownLatch2);
                    return Task.Result.COMPLETED;
                });
            }

            public TaskType type() {
                return TaskType.of("async memory task");
            }
        })).cache();
        cache.subscribe();
        Awaitility.waitAtMost(Durations.TEN_SECONDS).untilAsserted(() -> {
            ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId);
        });
        this.worker.cancelTask(generateTaskId);
        cache.block(Duration.ofSeconds(10L));
        Thread.sleep(50L);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).cancelled((TaskId) ArgumentMatchers.eq(generateTaskId), (Publisher) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.listener});
    }

    @Test
    void theWorkerShouldRunAsyncTasksInParallel() throws InterruptedException {
        TaskId generateTaskId = TaskId.generateTaskId();
        TaskId generateTaskId2 = TaskId.generateTaskId();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        AsyncSafeTask asyncSafeTask = new AsyncSafeTask() { // from class: org.apache.james.task.SerialTaskManagerWorkerTest.3
            /* renamed from: runAsync, reason: merged with bridge method [inline-methods] */
            public Mono<Task.Result> m4runAsync() {
                CountDownLatch countDownLatch4 = countDownLatch2;
                CountDownLatch countDownLatch5 = countDownLatch;
                return Mono.fromCallable(() -> {
                    countDownLatch4.countDown();
                    SerialTaskManagerWorkerTest.this.await(countDownLatch5);
                    return Task.Result.COMPLETED;
                });
            }

            public TaskType type() {
                return TaskType.of("async memory task");
            }
        };
        AsyncSafeTask asyncSafeTask2 = new AsyncSafeTask() { // from class: org.apache.james.task.SerialTaskManagerWorkerTest.4
            /* renamed from: runAsync, reason: merged with bridge method [inline-methods] */
            public Mono<Task.Result> m5runAsync() {
                CountDownLatch countDownLatch4 = countDownLatch3;
                return Mono.fromCallable(() -> {
                    countDownLatch4.countDown();
                    return Task.Result.COMPLETED;
                });
            }

            public TaskType type() {
                return TaskType.of("async memory task");
            }
        };
        this.worker.executeTask(new TaskWithId(generateTaskId, asyncSafeTask)).subscribe();
        await(countDownLatch2);
        this.worker.executeTask(new TaskWithId(generateTaskId2, asyncSafeTask2)).subscribe();
        await(countDownLatch3);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId);
        ((TaskManagerWorker.Listener) Mockito.verify(this.listener, Mockito.atLeastOnce())).started(generateTaskId2);
        countDownLatch.countDown();
    }

    private void await(CountDownLatch countDownLatch) throws InterruptedException {
        countDownLatch.await();
    }
}
