/*
 * Decompiled with CFR 0.152.
 */
package de.quantummaid.eventmaid.internal.pipe.transport;

import de.quantummaid.eventmaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.eventmaid.internal.pipe.events.PipeEventListener;
import de.quantummaid.eventmaid.internal.pipe.transport.PipeWaitingQueueIsFullException;
import de.quantummaid.eventmaid.internal.pipe.transport.SynchronousDelivery;
import de.quantummaid.eventmaid.internal.pipe.transport.TransportMechanism;
import de.quantummaid.eventmaid.subscribing.Subscriber;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public final class AsynchronousTransportMechanism<T>
implements TransportMechanism<T> {
    private final PipeEventListener<T> eventListener;
    private final SynchronousDelivery<T> synchronousDelivery;
    private final List<Subscriber<T>> subscribers;
    private final ThreadPoolExecutor threadPoolExecutor;

    @Override
    public void transport(T message) {
        this.eventListener.messageAccepted(message);
        this.eventListener.messageQueued(message);
        try {
            this.threadPoolExecutor.execute(() -> {
                this.eventListener.messageDequeued(message);
                try {
                    this.synchronousDelivery.deliver(message, this.subscribers);
                }
                catch (BubbleUpWrappedException e) {
                    throw (RuntimeException)e.getCause();
                }
            });
        }
        catch (RejectedExecutionException e) {
            throw new PipeWaitingQueueIsFullException();
        }
    }

    @Override
    public void close(boolean finishRemainingTasks) {
        if (finishRemainingTasks) {
            this.threadPoolExecutor.shutdown();
        } else {
            this.threadPoolExecutor.shutdownNow();
        }
    }

    @Override
    public boolean isShutdown() {
        return this.threadPoolExecutor.isShutdown();
    }

    @Override
    public boolean awaitTermination(int timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.threadPoolExecutor.awaitTermination(timeout, timeUnit);
    }

    public AsynchronousTransportMechanism(PipeEventListener<T> eventListener, SynchronousDelivery<T> synchronousDelivery, List<Subscriber<T>> subscribers, ThreadPoolExecutor threadPoolExecutor) {
        this.eventListener = eventListener;
        this.synchronousDelivery = synchronousDelivery;
        this.subscribers = subscribers;
        this.threadPoolExecutor = threadPoolExecutor;
    }
}

