/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.arrow;

import com.google.auto.service.AutoService;
import com.google.rpc.Code;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.arrow.ArrowFlightUtil;
import io.deephaven.server.arrow.ExchangeMarshaller;
import io.deephaven.server.arrow.ExchangeRequestHandlerFactory;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReflexiveUse;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import org.jetbrains.annotations.NotNull;

@ReflexiveUse(referrers={"ArrowFlightUtil"})
public class BarrageSubscriptionRequestHandler
implements ArrowFlightUtil.DoExchangeMarshaller.Handler {
    private final ArrowFlightUtil.DoExchangeMarshaller marshaller;
    private final TicketRouter ticketRouter;
    private final SessionState session;
    private final StreamObserver<BarrageMessageWriter.MessageView> listener;
    private ExchangeMarshaller.Subscription subscriptionObject;
    private Queue<BarrageSubscriptionRequest> preExportSubscriptions;
    private SessionState.ExportObject<?> onExportResolvedContinuation;

    public BarrageSubscriptionRequestHandler(ArrowFlightUtil.DoExchangeMarshaller marshaller, TicketRouter ticketRouter, SessionState session, StreamObserver<BarrageMessageWriter.MessageView> listener) {
        this.marshaller = marshaller;
        this.ticketRouter = ticketRouter;
        this.session = session;
        this.listener = listener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleMessage(@NotNull BarrageProtoUtil.MessageInfo message) {
        this.validateMessage(message);
        BarrageSubscriptionRequest subscriptionRequest = BarrageSubscriptionRequest.getRootAsBarrageSubscriptionRequest((ByteBuffer)message.app_metadata.msgPayloadAsByteBuffer());
        BarrageSubscriptionRequestHandler barrageSubscriptionRequestHandler = this;
        synchronized (barrageSubscriptionRequestHandler) {
            if (this.subscriptionObject != null) {
                this.apply(subscriptionRequest);
                return;
            }
            if (this.marshaller.isClosed()) {
                return;
            }
            if (this.preExportSubscriptions != null) {
                this.preExportSubscriptions.add(subscriptionRequest);
                return;
            }
        }
        if (subscriptionRequest.ticketVector() == null) {
            GrpcUtil.safelyError(this.listener, (Code)Code.INVALID_ARGUMENT, (String)"Ticket not specified.");
            return;
        }
        this.preExportSubscriptions = new ArrayDeque<BarrageSubscriptionRequest>();
        this.preExportSubscriptions.add(subscriptionRequest);
        String description = "FlightService#DoExchange(subscription, table=" + this.ticketRouter.getLogNameFor(subscriptionRequest.ticketAsByteBuffer(), "table") + ")";
        QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery((String)description, (String)this.session.getSessionId(), (QueryPerformanceNugget.Factory)QueryPerformanceNugget.DEFAULT_FACTORY);
        try (SafeCloseable ignored = queryPerformanceRecorder.startQuery();){
            SessionState.ExportObject table = this.ticketRouter.resolve(this.session, subscriptionRequest.ticketAsByteBuffer(), "table");
            BarrageSubscriptionRequestHandler barrageSubscriptionRequestHandler2 = this;
            synchronized (barrageSubscriptionRequestHandler2) {
                this.onExportResolvedContinuation = this.session.nonExport().queryPerformanceRecorder(queryPerformanceRecorder).require(table).onErrorHandler(this.marshaller::onError).submit(() -> this.onExportResolved(table));
            }
        }
    }

    protected void validateMessage(@NotNull BarrageProtoUtil.MessageInfo message) {
        if (message.app_metadata.msgType() != 5) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Request type cannot be changed after initialization, expected BarrageSubscriptionRequest metadata");
        }
        if (message.app_metadata.msgPayloadVector() == null) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Subscription request not supplied");
        }
    }

    private synchronized void onExportResolved(SessionState.ExportObject<Object> parent) {
        this.onExportResolvedContinuation = null;
        if (this.marshaller.isClosed()) {
            this.preExportSubscriptions = null;
            return;
        }
        BarrageSubscriptionRequest subscriptionRequest = this.preExportSubscriptions.remove();
        Object export = parent.get();
        ExchangeMarshaller marshallerForExport = ExchangeMarshaller.getMarshaller(export, this.marshaller.getMarshallers());
        if (marshallerForExport == null) {
            GrpcUtil.safelyError(this.listener, (Code)Code.FAILED_PRECONDITION, (String)("Ticket (" + ExportTicketHelper.toReadableString((ByteBuffer)subscriptionRequest.ticketAsByteBuffer(), (String)"ticket") + ") is not a subscribable table."));
            return;
        }
        BarrageSubscriptionOptions options = BarrageSubscriptionOptions.of((BarrageSubscriptionRequest)subscriptionRequest);
        this.subscriptionObject = marshallerForExport.subscribe(subscriptionRequest, options, export, this.listener);
        LivenessReferent subscriptionManagedReference = this.subscriptionObject.toManage();
        if (subscriptionManagedReference != null) {
            this.marshaller.manage(subscriptionManagedReference);
        }
        for (BarrageSubscriptionRequest request : this.preExportSubscriptions) {
            this.apply(request);
        }
        this.preExportSubscriptions = null;
    }

    private void apply(BarrageSubscriptionRequest subscriptionRequest) {
        boolean subscriptionFound;
        boolean bl = subscriptionFound = this.subscriptionObject != null && this.subscriptionObject.update(subscriptionRequest);
        if (!subscriptionFound) {
            throw Exceptions.statusRuntimeException((Code)Code.NOT_FOUND, (String)"Subscription was not found.");
        }
    }

    @Override
    public synchronized void close() {
        if (this.subscriptionObject != null) {
            this.subscriptionObject.close();
            this.subscriptionObject = null;
        } else {
            GrpcUtil.safelyComplete(this.listener);
        }
        if (this.onExportResolvedContinuation != null) {
            this.onExportResolvedContinuation.cancel();
            this.onExportResolvedContinuation = null;
        }
        if (this.preExportSubscriptions != null) {
            this.preExportSubscriptions = null;
        }
    }

    @AutoService(value={ExchangeRequestHandlerFactory.class})
    public static class BarrageSubscriptionRequestHandlerFactory
    implements ExchangeRequestHandlerFactory {
        @Override
        public byte type() {
            return 5;
        }

        @Override
        public ArrowFlightUtil.DoExchangeMarshaller.Handler create(ArrowFlightUtil.DoExchangeMarshaller marshaller, StreamObserver<BarrageMessageWriter.MessageView> listener) {
            return new BarrageSubscriptionRequestHandler(marshaller, marshaller.getTicketRouter(), marshaller.getSession(), listener);
        }
    }
}

