/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.arrow;

import com.google.auto.service.AutoService;
import com.google.rpc.Code;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.arrow.ArrowFlightUtil;
import io.deephaven.server.arrow.ExchangeMarshaller;
import io.deephaven.server.arrow.ExchangeRequestHandlerFactory;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReflexiveUse;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;

@ReflexiveUse(referrers={"ArrowFlightUtil"})
public class BarrageSnapshotRequestHandler
implements ArrowFlightUtil.DoExchangeMarshaller.Handler {
    private final AtomicReference<ArrowFlightUtil.HalfClosedState> halfClosedState = new AtomicReference<ArrowFlightUtil.HalfClosedState>(ArrowFlightUtil.HalfClosedState.DONT_CLOSE);
    private final ArrowFlightUtil.DoExchangeMarshaller marshaller;
    private final TicketRouter ticketRouter;
    private final SessionState session;
    private final StreamObserver<BarrageMessageWriter.MessageView> listener;
    private final BarrageMessageWriter.Factory streamGeneratorFactory;

    public BarrageSnapshotRequestHandler(ArrowFlightUtil.DoExchangeMarshaller marshaller, TicketRouter ticketRouter, SessionState session, BarrageMessageWriter.Factory streamGeneratorFactory, StreamObserver<BarrageMessageWriter.MessageView> listener) {
        this.marshaller = marshaller;
        this.ticketRouter = ticketRouter;
        this.session = session;
        this.listener = listener;
        this.streamGeneratorFactory = streamGeneratorFactory;
    }

    @Override
    public void handleMessage(@NotNull BarrageProtoUtil.MessageInfo message) {
        this.validateMessage(message);
        BarrageSnapshotRequest snapshotRequest = BarrageSnapshotRequest.getRootAsBarrageSnapshotRequest((ByteBuffer)message.app_metadata.msgPayloadAsByteBuffer());
        String ticketLogName = this.ticketRouter.getLogNameFor(snapshotRequest.ticketAsByteBuffer(), "table");
        String description = "FlightService#DoExchange(snapshot, table=" + ticketLogName + ")";
        QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery((String)description, (String)this.session.getSessionId(), (QueryPerformanceNugget.Factory)QueryPerformanceNugget.DEFAULT_FACTORY);
        try (SafeCloseable ignored = queryPerformanceRecorder.startQuery();){
            SessionState.ExportObject tableExport = this.ticketRouter.resolve(this.session, snapshotRequest.ticketAsByteBuffer(), "table");
            BarragePerformanceLog.SnapshotMetricsHelper metrics = new BarragePerformanceLog.SnapshotMetricsHelper();
            long queueStartTm = System.nanoTime();
            this.session.nonExport().queryPerformanceRecorder(queryPerformanceRecorder).require(tableExport).onError(this.listener).onSuccess(() -> {
                ArrowFlightUtil.HalfClosedState newState = this.halfClosedState.updateAndGet(current -> {
                    switch (current) {
                        case DONT_CLOSE: {
                            return ArrowFlightUtil.HalfClosedState.FINISHED_SENDING;
                        }
                        case CLIENT_HALF_CLOSED: {
                            return ArrowFlightUtil.HalfClosedState.CLOSED;
                        }
                        case FINISHED_SENDING: 
                        case CLOSED: {
                            throw new IllegalStateException("Can't finish streaming twice");
                        }
                    }
                    throw new IllegalStateException("Unknown state " + String.valueOf(current));
                });
                if (newState == ArrowFlightUtil.HalfClosedState.CLOSED) {
                    GrpcUtil.safelyComplete(this.listener);
                }
            }).submit(() -> {
                metrics.queueNanos = System.nanoTime() - queueStartTm;
                Object export = tableExport.get();
                ExchangeMarshaller marshallerForExport = ExchangeMarshaller.getMarshaller(export, this.marshaller.getMarshallers());
                if (marshallerForExport == null) {
                    throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)("Ticket (" + ticketLogName + ") is has no associated exchange marshaller."));
                }
                BarrageSnapshotOptions options = BarrageSnapshotOptions.of((BarrageSnapshotRequest)snapshotRequest);
                marshallerForExport.snapshot(snapshotRequest, options, export, metrics, this.listener, ticketLogName, this.streamGeneratorFactory);
            });
        }
    }

    protected void validateMessage(@NotNull BarrageProtoUtil.MessageInfo message) {
        if (message.app_metadata.msgType() != 7) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Request type cannot be changed after initialization, expected BarrageSnapshotRequest metadata");
        }
    }

    @Override
    public void close() {
        ArrowFlightUtil.HalfClosedState newState = this.halfClosedState.updateAndGet(current -> {
            switch (current) {
                case DONT_CLOSE: {
                    return ArrowFlightUtil.HalfClosedState.CLIENT_HALF_CLOSED;
                }
                case FINISHED_SENDING: {
                    return ArrowFlightUtil.HalfClosedState.CLOSED;
                }
                case CLIENT_HALF_CLOSED: 
                case CLOSED: {
                    throw new IllegalStateException("Can't close twice");
                }
            }
            throw new IllegalStateException("Unknown state " + String.valueOf(current));
        });
        if (newState == ArrowFlightUtil.HalfClosedState.CLOSED) {
            GrpcUtil.safelyComplete(this.listener);
        }
    }

    @AutoService(value={ExchangeRequestHandlerFactory.class})
    public static class BarrageSnapshotRequestHandlerFactory
    implements ExchangeRequestHandlerFactory {
        @Override
        public byte type() {
            return 7;
        }

        @Override
        public ArrowFlightUtil.DoExchangeMarshaller.Handler create(ArrowFlightUtil.DoExchangeMarshaller marshaller, StreamObserver<BarrageMessageWriter.MessageView> listener) {
            return new BarrageSnapshotRequestHandler(marshaller, marshaller.getTicketRouter(), marshaller.getSession(), marshaller.getStreamGeneratorFactory(), listener);
        }
    }
}

