/*
 * Decompiled with CFR 0.152.
 */
package de.quantummaid.eventmaid.messageFunction;

import de.quantummaid.eventmaid.exceptions.AlreadyClosedException;
import de.quantummaid.eventmaid.identification.CorrelationId;
import de.quantummaid.eventmaid.identification.MessageId;
import de.quantummaid.eventmaid.messageBus.MessageBus;
import de.quantummaid.eventmaid.messageFunction.MessageFunction;
import de.quantummaid.eventmaid.messageFunction.ResponseFuture;
import de.quantummaid.eventmaid.messageFunction.internal.ExpectedResponseFuture;
import de.quantummaid.eventmaid.messageFunction.internal.SubscriptionContainer;
import de.quantummaid.eventmaid.processingContext.EventType;
import de.quantummaid.eventmaid.processingContext.ProcessingContext;
import de.quantummaid.eventmaid.subscribing.SubscriptionId;
import lombok.NonNull;

final class MessageFunctionImpl
implements MessageFunction {
    private final MessageBus messageBus;
    private volatile boolean closed;

    private MessageFunctionImpl(@NonNull MessageBus messageBus) {
        if (messageBus == null) {
            throw new NullPointerException("messageBus is marked non-null but is null");
        }
        this.messageBus = messageBus;
    }

    static MessageFunctionImpl messageFunction(@NonNull MessageBus messageBus) {
        if (messageBus == null) {
            throw new NullPointerException("messageBus is marked non-null but is null");
        }
        return new MessageFunctionImpl(messageBus);
    }

    @Override
    public ResponseFuture request(EventType eventType, Object request) {
        if (this.closed) {
            throw new AlreadyClosedException();
        }
        RequestHandle requestHandle = new RequestHandle(this.messageBus);
        requestHandle.send(eventType, request);
        return requestHandle.getResponseFuture();
    }

    @Override
    public void close() {
        this.closed = true;
    }

    private static final class RequestHandle {
        private final ExpectedResponseFuture responseFuture;
        private final MessageBus messageBus;
        private final SubscriptionContainer subscriptionContainer;
        private volatile boolean alreadyFinishedOrCancelled;

        RequestHandle(MessageBus messageBus) {
            this.messageBus = messageBus;
            this.subscriptionContainer = SubscriptionContainer.subscriptionContainer(messageBus);
            this.responseFuture = ExpectedResponseFuture.expectedResponseFuture(this.subscriptionContainer);
        }

        public synchronized void send(EventType eventType, Object request) {
            MessageId messageId = MessageId.newUniqueMessageId();
            CorrelationId correlationId = CorrelationId.correlationIdFor(messageId);
            SubscriptionId answerSubscriptionId = this.messageBus.subscribe(correlationId, this::fulFillFuture);
            SubscriptionId errorSubscriptionId1 = this.messageBus.onException(correlationId, (processingContext, e) -> this.fulFillFuture((Exception)e));
            SubscriptionId errorSubscriptionId2 = this.messageBus.onException(eventType, (processingContext, e) -> {
                if (processingContext.getPayload() == request) {
                    this.fulFillFuture((Exception)e);
                }
            });
            this.subscriptionContainer.setSubscriptionIds(answerSubscriptionId, errorSubscriptionId1, errorSubscriptionId2);
            ProcessingContext<Object> processingContext2 = ProcessingContext.processingContext(eventType, messageId, request);
            try {
                this.messageBus.send(processingContext2);
            }
            catch (Exception e2) {
                this.fulFillFuture(e2);
            }
        }

        private synchronized void fulFillFuture(ProcessingContext<Object> processingContext) {
            if (this.alreadyFinishedOrCancelled) {
                return;
            }
            this.alreadyFinishedOrCancelled = true;
            this.responseFuture.fullFill(processingContext);
        }

        private synchronized void fulFillFuture(Exception exception) {
            if (this.alreadyFinishedOrCancelled) {
                return;
            }
            this.alreadyFinishedOrCancelled = true;
            this.responseFuture.fullFillWithException(exception);
        }

        public ExpectedResponseFuture getResponseFuture() {
            return this.responseFuture;
        }
    }
}

