package org.apache.james.task.eventsourcing;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.task.Hostname;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import scala.Option;

/* loaded from: input_file:org/apache/james/task/eventsourcing/TerminationSubscriberContract.class */
public interface TerminationSubscriberContract {
    public static final Completed COMPLETED_EVENT = new Completed(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), Task.Result.COMPLETED, Option.empty());
    public static final Failed FAILED_EVENT = new Failed(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), Option.empty(), Option.empty(), Option.empty());
    public static final Cancelled CANCELLED_EVENT = new Cancelled(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), Option.empty());
    public static final Duration DELAY_BETWEEN_EVENTS = Duration.ofMillis(50);
    public static final Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50);
    public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    TerminationSubscriber subscriber() throws Exception;

    @Test
    default void handlingCompletedShouldBeListed() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, COMPLETED_EVENT);
        assertEvents(subscriber).containsOnly(new Event[]{COMPLETED_EVENT});
    }

    @Test
    default void handlingFailedShouldBeListed() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, FAILED_EVENT);
        assertEvents(subscriber).containsOnly(new Event[]{FAILED_EVENT});
    }

    @Test
    default void handlingCancelledShouldBeListed() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, CANCELLED_EVENT);
        assertEvents(subscriber).containsOnly(new Event[]{CANCELLED_EVENT});
    }

    @Test
    default void handlingNonTerminalEventShouldNotBeListed() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo")));
        assertEvents(subscriber).isEmpty();
    }

    @Test
    default void handlingMultipleEventsShouldBeListed() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
        assertEvents(subscriber).containsExactly(new Event[]{COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT});
    }

    @Test
    default void multipleListeningEventsShouldShareEvents() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        Flux from = Flux.from(subscriber.listenEvents());
        Flux from2 = Flux.from(subscriber.listenEvents());
        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        from.subscribe((v1) -> {
            r1.add(v1);
        });
        ArrayList arrayList2 = new ArrayList();
        Objects.requireNonNull(arrayList2);
        from2.subscribe((v1) -> {
            r1.add(v1);
        });
        Awaitility.await().atMost(Durations.ONE_MINUTE).until(() -> {
            return Boolean.valueOf(arrayList.size() == 3 && arrayList2.size() == 3);
        });
        Assertions.assertThat(arrayList).containsExactly(new Event[]{COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT});
        Assertions.assertThat(arrayList2).containsExactly(new Event[]{COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT});
    }

    @Test
    default void dynamicListeningEventsShouldGetOnlyNewEvents() throws Exception {
        TerminationSubscriber subscriber = subscriber();
        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
        Assertions.assertThat((List) Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3L).dividedBy(2L))).then(Mono.defer(() -> {
            return collectEvents(subscriber.listenEvents());
        })).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).block()).containsExactly(new Event[]{FAILED_EVENT, CANCELLED_EVENT});
    }

    default ListAssert<Event> assertEvents(TerminationSubscriber terminationSubscriber) {
        return Assertions.assertThat((List) collectEvents(terminationSubscriber.listenEvents()).block());
    }

    default Mono<List<Event>> collectEvents(Publisher<Event> publisher) {
        return Flux.from(publisher).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7L))).collectList();
    }

    default void sendEvents(TerminationSubscriber terminationSubscriber, Event... eventArr) {
        Mono.delay(DELAY_BEFORE_PUBLISHING).flatMapMany(l -> {
            Flux map = Flux.fromArray(eventArr).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).delayElements(DELAY_BETWEEN_EVENTS).map(EventWithState::noState);
            Objects.requireNonNull(terminationSubscriber);
            return map.doOnNext(terminationSubscriber::handle);
        }).subscribe();
    }
}
