/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.arrow;

import com.google.flatbuffers.FlatBufferBuilder;
import com.google.rpc.Code;
import dagger.assisted.Assisted;
import dagger.assisted.AssistedFactory;
import dagger.assisted.AssistedInject;
import gnu.trove.map.hash.TByteObjectHashMap;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.SingletonLivenessManager;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.ArrowToTableConverter;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.arrow.ExchangeMarshaller;
import io.deephaven.server.arrow.ExchangeRequestHandlerFactory;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.Schema;
import org.apache.arrow.flight.impl.Flight;
import org.jetbrains.annotations.NotNull;

public class ArrowFlightUtil {
    private static final Logger log = LoggerFactory.getLogger(ArrowFlightUtil.class);

    public static void DoGetCustom(BarrageMessageWriter.Factory streamGeneratorFactory, SessionState session, TicketRouter ticketRouter, Flight.Ticket request, StreamObserver<InputStream> observer) {
        String ticketLogName = ticketRouter.getLogNameFor(request, "table");
        String description = "FlightService#DoGet(table=" + ticketLogName + ")";
        QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery((String)description, (String)session.getSessionId(), (QueryPerformanceNugget.Factory)QueryPerformanceNugget.DEFAULT_FACTORY);
        try (SafeCloseable ignored = queryPerformanceRecorder.startQuery();){
            SessionState.ExportObject tableExport = ticketRouter.resolve(session, request, "table");
            BarragePerformanceLog.SnapshotMetricsHelper metrics = new BarragePerformanceLog.SnapshotMetricsHelper();
            long queueStartTm = System.nanoTime();
            session.nonExport().queryPerformanceRecorder(queryPerformanceRecorder).require(tableExport).onError(observer).onSuccess(observer).submit(() -> {
                metrics.queueNanos = System.nanoTime() - queueStartTm;
                Object export = tableExport.get();
                if (export instanceof Table) {
                    export = ((Table)export).coalesce();
                }
                if (!(export instanceof BaseTable)) {
                    throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)("Ticket (" + ticketLogName + ") is not a subscribable table."));
                }
                BaseTable table = (BaseTable)export;
                metrics.tableId = Integer.toHexString(System.identityHashCode(table));
                metrics.tableKey = BarragePerformanceLog.getKeyFor((Table)table);
                MessageViewAdapter listener = new MessageViewAdapter(observer);
                listener.onNext(streamGeneratorFactory.getSchemaView(fbb -> BarrageUtil.makeTableSchemaPayload((FlatBufferBuilder)fbb, (BarrageOptions)BarrageUtil.DEFAULT_SNAPSHOT_OPTIONS, (TableDefinition)table.getDefinition(), (Map)table.getAttributes(), (boolean)table.isFlat())));
                BarrageUtil.createAndSendSnapshot((BarrageMessageWriter.Factory)streamGeneratorFactory, (BaseTable)table, null, null, (boolean)false, (BarrageSnapshotOptions)BarrageUtil.DEFAULT_SNAPSHOT_OPTIONS, (StreamObserver)listener, (BarragePerformanceLog.SnapshotMetricsHelper)metrics);
            });
        }
    }

    private static class MessageViewAdapter
    implements StreamObserver<BarrageMessageWriter.MessageView> {
        private final StreamObserver<InputStream> delegate;

        private MessageViewAdapter(StreamObserver<InputStream> delegate) {
            this.delegate = delegate;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(BarrageMessageWriter.MessageView value) {
            StreamObserver<InputStream> streamObserver = this.delegate;
            synchronized (streamObserver) {
                try {
                    value.forEachStream(arg_0 -> this.delegate.onNext(arg_0));
                }
                catch (IOException e) {
                    throw new UncheckedDeephavenException((Throwable)e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onError(Throwable t) {
            StreamObserver<InputStream> streamObserver = this.delegate;
            synchronized (streamObserver) {
                this.delegate.onError(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompleted() {
            StreamObserver<InputStream> streamObserver = this.delegate;
            synchronized (streamObserver) {
                this.delegate.onCompleted();
            }
        }
    }

    public static class DoExchangeMarshaller
    extends SingletonLivenessManager
    implements StreamObserver<InputStream>,
    Closeable {
        private final String myPrefix = "DoExchangeMarshaller{" + Integer.toHexString(System.identityHashCode(this)) + "}: ";
        private final SessionState session;
        private final StreamObserver<BarrageMessageWriter.MessageView> listener;
        private boolean isClosed = false;
        private boolean isFirstMsg = true;
        private final TicketRouter ticketRouter;
        private final BarrageMessageWriter.Factory streamGeneratorFactory;
        private final SessionService.ErrorTransformer errorTransformer;
        private final TByteObjectHashMap<ExchangeRequestHandlerFactory> requestHandlerFactories;
        private final List<ExchangeMarshaller> marshallers;
        private Handler requestHandler = null;

        @AssistedInject
        public DoExchangeMarshaller(TicketRouter ticketRouter, BarrageMessageWriter.Factory streamGeneratorFactory, List<ExchangeMarshaller> exchangeMarshallers, Set<ExchangeRequestHandlerFactory> requestHandlerFactories, SessionService.ErrorTransformer errorTransformer, @Assisted SessionState session, @Assisted StreamObserver<InputStream> responseObserver) {
            this.ticketRouter = ticketRouter;
            this.streamGeneratorFactory = streamGeneratorFactory;
            this.session = session;
            this.listener = new MessageViewAdapter(responseObserver);
            this.errorTransformer = errorTransformer;
            this.marshallers = exchangeMarshallers;
            this.requestHandlerFactories = new TByteObjectHashMap(requestHandlerFactories.size());
            for (ExchangeRequestHandlerFactory factory : requestHandlerFactories) {
                ExchangeRequestHandlerFactory old = (ExchangeRequestHandlerFactory)this.requestHandlerFactories.put(factory.type(), (Object)factory);
                if (old == null) continue;
                throw new IllegalStateException("Cannot have multiple registered factories for type " + factory.type() + ", existing=" + String.valueOf(old) + ", new=" + String.valueOf(factory));
            }
            this.session.addOnCloseCallback(this);
            if (responseObserver instanceof ServerCallStreamObserver) {
                ((ServerCallStreamObserver)responseObserver).setOnCancelHandler(this::onCancel);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(InputStream request) {
            BarrageProtoUtil.MessageInfo message;
            try {
                message = BarrageProtoUtil.parseProtoMessage((InputStream)request);
            }
            catch (IOException err) {
                throw this.errorTransformer.transform(err);
            }
            DoExchangeMarshaller doExchangeMarshaller = this;
            synchronized (doExchangeMarshaller) {
                if (this.requestHandler != null) {
                    this.requestHandler.handleMessage(message);
                    return;
                }
                if (message.app_metadata != null) {
                    byte type = message.app_metadata.msgType();
                    ExchangeRequestHandlerFactory factory = (ExchangeRequestHandlerFactory)this.requestHandlerFactories.get(type);
                    if (factory == null) {
                        throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)(this.myPrefix + "received a message with unhandled BarrageMessageType: " + type));
                    }
                    this.requestHandler = factory.create(this, this.listener);
                    if (this.requestHandler == null) {
                        throw new IllegalStateException("ExchangeRequestHandlerFactory returned null for message of type " + type);
                    }
                    this.requestHandler.handleMessage(message);
                    return;
                }
                if (!this.isFirstMsg) {
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)(this.myPrefix + "failed to receive Barrage request metadata"));
                }
                this.isFirstMsg = false;
            }
        }

        public List<ExchangeMarshaller> getMarshallers() {
            return this.marshallers;
        }

        public boolean isClosed() {
            return this.isClosed;
        }

        public BarrageMessageWriter.Factory getStreamGeneratorFactory() {
            return this.streamGeneratorFactory;
        }

        public TicketRouter getTicketRouter() {
            return this.ticketRouter;
        }

        public SessionState getSession() {
            return this.session;
        }

        public void onCancel() {
            log.debug().append((CharSequence)this.myPrefix).append((CharSequence)"cancel requested").endl();
            this.tryClose();
        }

        public void onError(Throwable t) {
            GrpcUtil.safelyError(this.listener, (StatusRuntimeException)this.errorTransformer.transform(t));
            this.tryClose();
        }

        public void onCompleted() {
            log.debug().append((CharSequence)this.myPrefix).append((CharSequence)"client stream closed subscription").endl();
            this.tryClose();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            DoExchangeMarshaller doExchangeMarshaller = this;
            synchronized (doExchangeMarshaller) {
                if (this.isClosed) {
                    return;
                }
                this.isClosed = true;
            }
            try {
                if (this.requestHandler != null) {
                    this.requestHandler.close();
                }
            }
            catch (IOException err) {
                throw this.errorTransformer.transform(err);
            }
            this.release();
        }

        private void tryClose() {
            if (this.session.removeOnCloseCallback(this)) {
                this.close();
            }
        }

        public static interface Handler
        extends Closeable {
            public void handleMessage(@NotNull BarrageProtoUtil.MessageInfo var1);
        }

        @AssistedFactory
        public static interface Factory {
            public DoExchangeMarshaller openExchange(SessionState var1, StreamObserver<InputStream> var2);
        }
    }

    static enum HalfClosedState {
        DONT_CLOSE,
        CLIENT_HALF_CLOSED,
        FINISHED_SENDING,
        CLOSED;

    }

    public static class DoPutObserver
    extends ArrowToTableConverter
    implements StreamObserver<InputStream>,
    Closeable {
        private final SessionState session;
        private final TicketRouter ticketRouter;
        private final SessionService.ErrorTransformer errorTransformer;
        private final StreamObserver<Flight.PutResult> observer;
        private SessionState.ExportBuilder<Table> resultExportBuilder;
        private Flight.FlightDescriptor flightDescriptor;
        private Schema schema;

        public DoPutObserver(SessionState session, TicketRouter ticketRouter, SessionService.ErrorTransformer errorTransformer, StreamObserver<Flight.PutResult> observer) {
            this.session = session;
            this.ticketRouter = ticketRouter;
            this.errorTransformer = errorTransformer;
            this.observer = observer;
            this.session.addOnCloseCallback(this);
            if (observer instanceof ServerCallStreamObserver) {
                ((ServerCallStreamObserver)observer).setOnCancelHandler(this::onCancel);
            }
        }

        public void onNext(InputStream request) {
            BarrageProtoUtil.MessageInfo mi;
            try {
                mi = BarrageProtoUtil.parseProtoMessage((InputStream)request);
            }
            catch (IOException err) {
                throw this.errorTransformer.transform(err);
            }
            if (mi.descriptor != null) {
                if (this.flightDescriptor != null) {
                    if (!this.flightDescriptor.equals((Object)mi.descriptor)) {
                        throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"additional flight descriptor sent does not match original descriptor");
                    }
                } else {
                    this.flightDescriptor = mi.descriptor;
                    this.resultExportBuilder = this.ticketRouter.publish(this.session, mi.descriptor, "Flight.Descriptor", null).onError(this.observer);
                }
            }
            if (mi.app_metadata != null && mi.app_metadata.msgType() == 4) {
                this.options = BarrageSubscriptionOptions.of((BarrageSubscriptionRequest)BarrageSubscriptionRequest.getRootAsBarrageSubscriptionRequest((ByteBuffer)mi.app_metadata.msgPayloadAsByteBuffer()));
            }
            if (mi.header == null) {
                return;
            }
            if (mi.header.headerType() == 1) {
                this.schema = DoPutObserver.parseArrowSchema((BarrageProtoUtil.MessageInfo)mi);
                return;
            }
            if (this.resultTable == null && this.schema != null) {
                this.configureWithSchema(this.schema);
            }
            if (this.resultTable == null) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Schema must be processed before record-batch messages");
            }
            if (mi.header.headerType() != 3) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)("Only schema/record-batch messages supported, instead got " + MessageHeader.name((int)mi.header.headerType())));
            }
            int numColumns = this.resultTable.getColumnSources().size();
            BarrageMessage msg = this.createBarrageMessage(mi, numColumns);
            msg.rowsAdded = RowSetFactory.fromRange((long)this.totalRowsRead, (long)(this.totalRowsRead + msg.length - 1L));
            msg.rowsIncluded = msg.rowsAdded.copy();
            msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS;
            this.totalRowsRead += msg.length;
            this.resultTable.handleBarrageMessage(msg);
            GrpcUtil.safelyOnNext(this.observer, (Object)Flight.PutResult.getDefaultInstance());
        }

        private void onCancel() {
            if (this.resultTable != null) {
                this.resultTable.dropReference();
                this.resultTable = null;
            }
            if (this.resultExportBuilder != null) {
                this.resultExportBuilder.submit(() -> {
                    throw Exceptions.statusRuntimeException((Code)Code.CANCELLED, (String)"cancelled");
                });
                this.resultExportBuilder = null;
            }
            this.session.removeOnCloseCallback(this);
        }

        public void onError(Throwable t) {
            if (this.resultTable != null) {
                this.resultTable.dropReference();
                this.resultTable = null;
            }
            if (this.resultExportBuilder != null) {
                this.resultExportBuilder.submit(() -> {
                    throw new UncheckedDeephavenException(t);
                });
                this.resultExportBuilder = null;
            }
            this.session.removeOnCloseCallback(this);
        }

        public void onCompleted() {
            if (this.resultExportBuilder == null) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Result flight descriptor never provided");
            }
            if (this.resultTable == null && this.schema != null) {
                this.configureWithSchema(this.schema);
            }
            if (this.resultTable == null) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Result flight schema never provided");
            }
            BarrageTable localResultTable = this.resultTable;
            this.resultTable = null;
            SessionState.ExportBuilder<Table> localExportBuilder = this.resultExportBuilder;
            this.resultExportBuilder = null;
            if (!localExportBuilder.getExport().tryManage((LivenessReferent)localResultTable)) {
                GrpcUtil.safelyError(this.observer, (Code)Code.DATA_LOSS, (String)"Result export already destroyed");
                localResultTable.dropReference();
                this.session.removeOnCloseCallback(this);
                return;
            }
            localResultTable.dropReference();
            localExportBuilder.onSuccess(() -> GrpcUtil.safelyComplete(this.observer)).submit(() -> {
                this.session.removeOnCloseCallback(this);
                return localResultTable;
            });
        }

        @Override
        public void close() {
            GrpcUtil.safelyError(this.observer, (Code)Code.UNAUTHENTICATED, (String)"Session expired");
        }
    }
}

