package org.apache.causeway.extensions.sse.wicket.services;

import jakarta.annotation.Priority;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Predicate;
import org.apache.causeway.applib.services.iactnlayer.InteractionService;
import org.apache.causeway.applib.services.xactn.TransactionService;
import org.apache.causeway.commons.internal.collections._Lists;
import org.apache.causeway.extensions.sse.applib.annotations.SseSource;
import org.apache.causeway.extensions.sse.applib.service.SseChannel;
import org.apache.causeway.extensions.sse.applib.service.SseService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Named(SseServiceDefault.LOGICAL_TYPE_NAME)
@Priority(1073741823)
@Service
@Qualifier("Default")
/* loaded from: input_file:org/apache/causeway/extensions/sse/wicket/services/SseServiceDefault.class */
public class SseServiceDefault implements SseService {
    private static final Logger log = LogManager.getLogger(SseServiceDefault.class);
    public static final String LOGICAL_TYPE_NAME = "causeway.ext.sse.SseServiceDefault";

    @Inject
    private TransactionService transactionService;

    @Inject
    private InteractionService interactionService;
    private final EventStreamPool eventStreamPool = new EventStreamPool();

    /* renamed from: org.apache.causeway.extensions.sse.wicket.services.SseServiceDefault$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/causeway/extensions/sse/wicket/services/SseServiceDefault$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$causeway$extensions$sse$applib$service$SseService$ExecutionBehavior = new int[SseService.ExecutionBehavior.values().length];

        static {
            try {
                $SwitchMap$org$apache$causeway$extensions$sse$applib$service$SseService$ExecutionBehavior[SseService.ExecutionBehavior.SIMPLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$causeway$extensions$sse$applib$service$SseService$ExecutionBehavior[SseService.ExecutionBehavior.REQUIRES_NEW_SESSION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/causeway/extensions/sse/wicket/services/SseServiceDefault$EventStreamDefault.class */
    private static final class EventStreamDefault implements SseChannel {
        private static final Logger log = LogManager.getLogger(EventStreamDefault.class);
        private static final Object $LOCK = new Object[0];
        private final UUID id;
        private final Class<?> sourceType;
        private final CountDownLatch latch = new CountDownLatch(1);
        private final Queue<Predicate<SseSource>> listeners = new ConcurrentLinkedQueue();

        public void fire(SseSource sseSource) {
            synchronized ($LOCK) {
                if (isActive()) {
                    ArrayList newArrayList = _Lists.newArrayList(this.listeners);
                    log.debug("about to fire events to {} listeners", new Supplier[]{() -> {
                        return Integer.valueOf(newArrayList.size());
                    }});
                    ArrayList newArrayList2 = _Lists.newArrayList();
                    newArrayList.forEach(predicate -> {
                        if (predicate.test(sseSource)) {
                            return;
                        }
                        newArrayList2.add(predicate);
                    });
                    synchronized ($LOCK) {
                        if (isActive()) {
                            this.listeners.removeAll(newArrayList2);
                        }
                    }
                }
            }
        }

        public void listenWhile(Predicate<SseSource> predicate) {
            synchronized ($LOCK) {
                if (isActive()) {
                    this.listeners.add(predicate);
                }
            }
        }

        public void close() {
            synchronized ($LOCK) {
                this.listeners.clear();
                this.latch.countDown();
            }
        }

        private boolean isActive() {
            return this.latch.getCount() > 0;
        }

        public void awaitClose() throws InterruptedException {
            this.latch.await();
        }

        public EventStreamDefault(UUID uuid, Class<?> cls) {
            this.id = uuid;
            this.sourceType = cls;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public Queue<Predicate<SseSource>> getListeners() {
            return this.listeners;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof EventStreamDefault)) {
                return false;
            }
            EventStreamDefault eventStreamDefault = (EventStreamDefault) obj;
            UUID id = getId();
            UUID id2 = eventStreamDefault.getId();
            if (id == null) {
                if (id2 != null) {
                    return false;
                }
            } else if (!id.equals(id2)) {
                return false;
            }
            Class<?> sourceType = getSourceType();
            Class<?> sourceType2 = eventStreamDefault.getSourceType();
            if (sourceType == null) {
                if (sourceType2 != null) {
                    return false;
                }
            } else if (!sourceType.equals(sourceType2)) {
                return false;
            }
            CountDownLatch latch = getLatch();
            CountDownLatch latch2 = eventStreamDefault.getLatch();
            if (latch == null) {
                if (latch2 != null) {
                    return false;
                }
            } else if (!latch.equals(latch2)) {
                return false;
            }
            Queue<Predicate<SseSource>> listeners = getListeners();
            Queue<Predicate<SseSource>> listeners2 = eventStreamDefault.getListeners();
            return listeners == null ? listeners2 == null : listeners.equals(listeners2);
        }

        public int hashCode() {
            UUID id = getId();
            int hashCode = (1 * 59) + (id == null ? 43 : id.hashCode());
            Class<?> sourceType = getSourceType();
            int hashCode2 = (hashCode * 59) + (sourceType == null ? 43 : sourceType.hashCode());
            CountDownLatch latch = getLatch();
            int hashCode3 = (hashCode2 * 59) + (latch == null ? 43 : latch.hashCode());
            Queue<Predicate<SseSource>> listeners = getListeners();
            return (hashCode3 * 59) + (listeners == null ? 43 : listeners.hashCode());
        }

        public String toString() {
            return "SseServiceDefault.EventStreamDefault(id=" + getId() + ", sourceType=" + getSourceType() + ", latch=" + getLatch() + ", listeners=" + getListeners() + ")";
        }

        public UUID getId() {
            return this.id;
        }

        public Class<?> getSourceType() {
            return this.sourceType;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/causeway/extensions/sse/wicket/services/SseServiceDefault$EventStreamLifecycle.class */
    public static class EventStreamLifecycle {
        private static final Object $LOCK = new Object[0];
        private final SseChannel eventStream;
        private final EventStreamPool eventStreamPool;
        private int runningTasksCounter;

        public void acquire() {
            synchronized ($LOCK) {
                this.runningTasksCounter++;
            }
        }

        public void release() {
            int i;
            synchronized ($LOCK) {
                i = this.runningTasksCounter - 1;
                this.runningTasksCounter = i;
                if (i < 1) {
                    this.eventStreamPool.eventStreamsByType.remove(this.eventStream.getSourceType());
                }
            }
            if (i < 1) {
                this.eventStream.close();
            }
        }

        private EventStreamLifecycle(SseChannel sseChannel, EventStreamPool eventStreamPool) {
            this.eventStream = sseChannel;
            this.eventStreamPool = eventStreamPool;
        }

        public static EventStreamLifecycle of(SseChannel sseChannel, EventStreamPool eventStreamPool) {
            return new EventStreamLifecycle(sseChannel, eventStreamPool);
        }

        public SseChannel getEventStream() {
            return this.eventStream;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/causeway/extensions/sse/wicket/services/SseServiceDefault$EventStreamPool.class */
    public static class EventStreamPool {
        private final Map<Class<?>, EventStreamLifecycle> eventStreamsByType = new ConcurrentHashMap();

        private EventStreamPool() {
        }

        public Optional<SseChannel> lookupByType(Class<?> cls) {
            return Optional.ofNullable(this.eventStreamsByType.get(cls)).map((v0) -> {
                return v0.getEventStream();
            });
        }

        public synchronized EventStreamLifecycle acquireLifecycleForType(Class<?> cls) {
            EventStreamLifecycle computeIfAbsent = this.eventStreamsByType.computeIfAbsent(cls, cls2 -> {
                return EventStreamLifecycle.of(new EventStreamDefault(UUID.randomUUID(), cls), this);
            });
            computeIfAbsent.acquire();
            return computeIfAbsent;
        }
    }

    public Optional<SseChannel> lookupByType(Class<?> cls) {
        return this.eventStreamPool.lookupByType(cls);
    }

    public void submit(SseSource sseSource, SseService.ExecutionBehavior executionBehavior) {
        Objects.requireNonNull(sseSource);
        Objects.requireNonNull(executionBehavior);
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        switch (AnonymousClass1.$SwitchMap$org$apache$causeway$extensions$sse$applib$service$SseService$ExecutionBehavior[executionBehavior.ordinal()]) {
            case 1:
                CompletableFuture.runAsync(() -> {
                    run(sseSource);
                }, commonPool);
                return;
            case 2:
            default:
                CompletableFuture.runAsync(() -> {
                    this.interactionService.runAnonymous(() -> {
                        this.transactionService.runWithinCurrentTransactionElseCreateNew(() -> {
                            run(sseSource);
                        });
                    });
                }, commonPool);
                return;
        }
    }

    private void run(SseSource sseSource) {
        Class<?> cls = sseSource.getClass();
        EventStreamLifecycle acquireLifecycleForType = this.eventStreamPool.acquireLifecycleForType(cls);
        SseChannel eventStream = acquireLifecycleForType.getEventStream();
        log.debug("submitting task type='{}' -> stream='{}'", cls, eventStream.getId());
        try {
            try {
                sseSource.run(eventStream);
                acquireLifecycleForType.release();
            } catch (Exception e) {
                log.warn("task run failed on source type {} failed", cls, e);
                acquireLifecycleForType.release();
            }
        } catch (Throwable th) {
            acquireLifecycleForType.release();
            throw th;
        }
    }
}
