/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.arrow;

import com.google.auto.service.AutoService;
import com.google.flatbuffers.FlatBufferBuilder;
import com.google.rpc.Code;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.arrow.BarrageRequestHelpers;
import io.deephaven.server.arrow.ExchangeMarshaller;
import io.deephaven.server.arrow.ExchangeMarshallerModule;
import io.deephaven.server.barrage.BarrageMessageProducer;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReflexiveUse;
import io.grpc.stub.StreamObserver;
import java.util.BitSet;
import java.util.Map;

@ReflexiveUse(referrers={"ExchangeMarshallerModule"})
public class TableExchangeMarshaller
implements ExchangeMarshaller {
    private final BarrageMessageProducer.Operation.Factory bmpOperationFactory;

    public TableExchangeMarshaller(BarrageMessageProducer.Operation.Factory bmpOperationFactory) {
        this.bmpOperationFactory = bmpOperationFactory;
    }

    @Override
    public int priority() {
        return 1000;
    }

    @Override
    public boolean accept(Object export) {
        return export instanceof Table;
    }

    @Override
    public void snapshot(BarrageSnapshotRequest snapshotRequest, BarrageSnapshotOptions options, Object export, BarragePerformanceLog.SnapshotMetricsHelper metrics, StreamObserver<BarrageMessageWriter.MessageView> listener, String ticketLogName, BarrageMessageWriter.Factory streamGeneratorFactory) {
        if (export instanceof Table) {
            export = ((Table)export).coalesce();
        }
        if (!(export instanceof BaseTable)) {
            throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)("Ticket (" + ticketLogName + ") is not a subscribable table."));
        }
        BaseTable table = (BaseTable)export;
        metrics.tableId = Integer.toHexString(System.identityHashCode(table));
        metrics.tableKey = BarragePerformanceLog.getKeyFor((Table)table);
        if (table.isFailed()) {
            throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)"Table is already failed");
        }
        listener.onNext((Object)streamGeneratorFactory.getSchemaView(fbb -> BarrageUtil.makeTableSchemaPayload((FlatBufferBuilder)fbb, (BarrageOptions)options, (TableDefinition)table.getDefinition(), (Map)table.getAttributes(), (boolean)table.isFlat())));
        BitSet columns = BarrageRequestHelpers.getColumns(snapshotRequest);
        RowSet viewport = BarrageRequestHelpers.getViewport(snapshotRequest);
        boolean reverseViewport = snapshotRequest.reverseViewport();
        BarrageUtil.createAndSendSnapshot((BarrageMessageWriter.Factory)streamGeneratorFactory, (BaseTable)table, (BitSet)columns, (RowSet)viewport, (boolean)reverseViewport, (BarrageSnapshotOptions)options, listener, (BarragePerformanceLog.SnapshotMetricsHelper)metrics);
    }

    @Override
    public ExchangeMarshaller.Subscription subscribe(BarrageSubscriptionRequest subscriptionRequest, BarrageSubscriptionOptions options, Object export, StreamObserver<BarrageMessageWriter.MessageView> listener) {
        BarrageMessageProducer bmp;
        QueryTable table = (QueryTable)((Table)export).coalesce();
        long minUpdateIntervalMs = BarrageRequestHelpers.getMinUpdateIntervalMs(subscriptionRequest.subscriptionOptions());
        if (table.isFailed()) {
            throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)"Table is already failed");
        }
        UpdateGraph ug = table.getUpdateGraph();
        try (SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(ug).open();){
            bmp = (BarrageMessageProducer)table.getResult((QueryTable.Operation)this.bmpOperationFactory.create((BaseTable<?>)table, minUpdateIntervalMs));
        }
        BitSet columns = BarrageRequestHelpers.getColumns(subscriptionRequest);
        RowSet viewport = BarrageRequestHelpers.getViewport(subscriptionRequest);
        boolean reverseViewport = subscriptionRequest.reverseViewport();
        bmp.addSubscription(listener, options, columns, viewport, reverseViewport);
        return new Subscription(bmp, listener);
    }

    private static class Subscription
    implements ExchangeMarshaller.Subscription {
        private final BarrageMessageProducer bmp;
        private final StreamObserver<BarrageMessageWriter.MessageView> listener;

        private Subscription(BarrageMessageProducer bmp, StreamObserver<BarrageMessageWriter.MessageView> listener) {
            this.bmp = bmp;
            this.listener = listener;
        }

        @Override
        public LivenessReferent toManage() {
            if (this.bmp.isRefreshing()) {
                return this.bmp;
            }
            return null;
        }

        @Override
        public void close() {
            this.bmp.removeSubscription(this.listener);
        }

        @Override
        public boolean update(BarrageSubscriptionRequest subscriptionRequest) {
            BitSet columns = BarrageRequestHelpers.getColumns(subscriptionRequest);
            RowSet viewport = BarrageRequestHelpers.getViewport(subscriptionRequest);
            boolean reverseViewport = subscriptionRequest.reverseViewport();
            return this.bmp.updateSubscription(this.listener, viewport, columns, reverseViewport);
        }
    }

    @AutoService(value={ExchangeMarshallerModule.Factory.class})
    public static class Factory
    implements ExchangeMarshallerModule.Factory {
        @Override
        public ExchangeMarshaller create(Scheduler scheduler, SessionService.ErrorTransformer errorTransformer, BarrageMessageWriter.Factory streamGeneratorFactory) {
            BarrageMessageProducer.Operation.Factory factory = (parent, updateIntervalMs) -> new BarrageMessageProducer.Operation(scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs);
            return new TableExchangeMarshaller(factory);
        }
    }
}

