/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.hierarchicaltable;

import com.google.flatbuffers.FlatBufferBuilder;
import com.google.rpc.Code;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger;
import io.deephaven.extensions.barrage.BarrageTypeInfo;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.extensions.barrage.chunk.DefaultChunkWriterFactory;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.extensions.barrage.util.HierarchicalTableSchemaUtil;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.hierarchicaltable.HierarchicalTableView;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.SafeCloseable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.LongConsumer;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import org.HdrHistogram.Histogram;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class HierarchicalTableViewSubscription
extends LivenessArtifact {
    private final Scheduler scheduler;
    private final SessionService.ErrorTransformer errorTransformer;
    private final BarrageMessageWriter.Factory streamGeneratorFactory;
    private final HierarchicalTableView view;
    private final StreamObserver<BarrageMessageWriter.MessageView> listener;
    private final BarrageSubscriptionOptions subscriptionOptions;
    private final long intervalDurationNanos;
    private final ExecutionContext executionContext;
    private final Stats stats;
    private final TableUpdateListener keyTableListener;
    private final TableUpdateListener sourceTableListener;
    private final Runnable propagationJob;
    private final Object schedulingLock = new Object();
    private boolean snapshotPending;
    private long scheduledTimeNanos = Long.MAX_VALUE;
    private long lastSnapshotTimeNanos;
    private boolean upstreamDataChanged;
    private Throwable upstreamFailure;
    private BitSet pendingColumns;
    private RowSet pendingRows;
    private final Object snapshotLock = new Object();
    private BitSet columns;
    private RowSet rows;
    private final WritableRowSet prevKeyspaceViewportRows = RowSetFactory.empty();
    private volatile State state = State.Active;

    public HierarchicalTableViewSubscription(@NotNull Scheduler scheduler, @NotNull SessionService.ErrorTransformer errorTransformer, @NotNull BarrageMessageWriter.Factory streamGeneratorFactory, @NotNull HierarchicalTableView view, @NotNull StreamObserver<BarrageMessageWriter.MessageView> listener, @NotNull BarrageSubscriptionOptions subscriptionOptions, long intervalDurationMillis) {
        this.scheduler = scheduler;
        this.errorTransformer = errorTransformer;
        this.streamGeneratorFactory = streamGeneratorFactory;
        this.view = view;
        this.listener = listener;
        this.subscriptionOptions = subscriptionOptions;
        this.intervalDurationNanos = TimeUnit.NANOSECONDS.convert(intervalDurationMillis, TimeUnit.MILLISECONDS);
        this.executionContext = ExecutionContext.newBuilder().build();
        String statsKey = BarragePerformanceLog.getKeyFor(view.getHierarchicalTable(), () -> view.getHierarchicalTable().getDescription());
        this.stats = scheduler.inTestMode() || statsKey == null ? null : new Stats(statsKey);
        if (view.getKeyTable().isRefreshing()) {
            this.keyTableListener = new ChangeListener();
            view.getKeyTable().addUpdateListener(this.keyTableListener);
            this.manage((LivenessReferent)this.keyTableListener);
        } else {
            this.keyTableListener = null;
        }
        if (view.getHierarchicalTable().getSource().isRefreshing()) {
            this.sourceTableListener = new ChangeListener();
            view.getHierarchicalTable().getSource().addUpdateListener(this.sourceTableListener);
            this.manage((LivenessReferent)this.sourceTableListener);
        } else {
            this.sourceTableListener = null;
        }
        if (this.keyTableListener != null || this.sourceTableListener != null) {
            this.manage((LivenessReferent)view);
        }
        this.propagationJob = this::process;
        this.columns = new BitSet();
        this.columns.set(0, view.getHierarchicalTable().getAvailableColumnDefinitions().size());
        this.rows = RowSetFactory.empty();
        GrpcUtil.safelyOnNext(listener, (Object)streamGeneratorFactory.getSchemaView(fbb -> HierarchicalTableSchemaUtil.makeSchemaPayload((FlatBufferBuilder)fbb, view.getHierarchicalTable())));
    }

    @OverridingMethodsMustInvokeSuper
    protected void destroy() {
        super.destroy();
        if (this.keyTableListener != null) {
            this.view.getKeyTable().removeUpdateListener(this.keyTableListener);
        }
        if (this.sourceTableListener != null) {
            this.view.getHierarchicalTable().getSource().removeUpdateListener(this.sourceTableListener);
        }
        if (this.stats != null) {
            this.stats.stop();
        }
    }

    public void completed() {
        this.state = State.Done;
        GrpcUtil.safelyComplete(this.listener);
        this.forceReferenceCountToZero();
    }

    private void recordSnapshotNanos(long snapshotNanos) {
        this.recordMetric(stats -> stats.snapshotNanos, snapshotNanos);
    }

    private void recordWriteMetrics(long bytes, long cpuNanos) {
        this.recordMetric(stats -> stats.writeBits, bytes * 8L);
        this.recordMetric(stats -> stats.writeNanos, cpuNanos);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recordMetric(@NotNull Function<Stats, Histogram> histogramGetter, long value) {
        if (this.stats == null) {
            return;
        }
        Stats stats = this.stats;
        synchronized (stats) {
            histogramGetter.apply(this.stats).recordValue(value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process() {
        if (this.state == State.Done) {
            return;
        }
        Object object = this.snapshotLock;
        synchronized (object) {
            boolean sendError;
            Object object2 = this.schedulingLock;
            synchronized (object2) {
                block40: {
                    if (!this.snapshotPending) {
                        return;
                    }
                    this.snapshotPending = false;
                    State localState = this.state;
                    if (localState == State.Done) {
                        return;
                    }
                    boolean bl = sendError = localState == State.Failed;
                    if (sendError) {
                        this.state = State.Done;
                        this.upstreamDataChanged = false;
                        this.pendingColumns = null;
                        try (RowSet ignored1 = this.pendingRows;
                             RowSet ignored2 = this.rows;){
                            this.pendingRows = null;
                            break block40;
                        }
                    }
                    boolean sendSnapshot = this.upstreamDataChanged;
                    this.upstreamDataChanged = false;
                    if (this.pendingColumns != null) {
                        this.columns = this.pendingColumns;
                        this.pendingColumns = null;
                        sendSnapshot = true;
                    }
                    if (this.pendingRows != null) {
                        try (RowSet ignored = this.rows;){
                            this.rows = this.pendingRows;
                        }
                        this.pendingRows = null;
                        sendSnapshot = true;
                    }
                    if (!sendSnapshot) {
                        return;
                    }
                    this.lastSnapshotTimeNanos = System.nanoTime();
                }
            }
            if (sendError) {
                GrpcUtil.safelyError(this.listener, (StatusRuntimeException)this.errorTransformer.transform(this.upstreamFailure));
                return;
            }
            try (SafeCloseable ignored = this.executionContext == null ? null : this.executionContext.open();){
                HierarchicalTableViewSubscription.buildAndSendSnapshot(this.streamGeneratorFactory, this.listener, this.subscriptionOptions, this.view, this::recordSnapshotNanos, this::recordWriteMetrics, this.columns, this.rows, this.prevKeyspaceViewportRows);
            }
            catch (Exception e) {
                GrpcUtil.safelyError(this.listener, (StatusRuntimeException)this.errorTransformer.transform(e));
                this.state = State.Done;
            }
        }
    }

    private static void buildAndSendSnapshot(@NotNull BarrageMessageWriter.Factory messageWriterFactory, @NotNull StreamObserver<BarrageMessageWriter.MessageView> listener, @NotNull BarrageSubscriptionOptions subscriptionOptions, @NotNull HierarchicalTableView view, @NotNull LongConsumer snapshotNanosConsumer, @NotNull BarragePerformanceLog.WriteMetricsConsumer writeMetricsConsumer, @NotNull BitSet columns, @NotNull RowSet rows, @NotNull WritableRowSet prevKeyspaceViewportRows) {
        List columnDefinitions = view.getHierarchicalTable().getAvailableColumnDefinitions();
        int numAvailableColumns = columnDefinitions.size();
        int numRows = rows.intSize();
        WritableChunk[] destinations = (WritableChunk[])columns.stream().mapToObj(ci -> ReinterpretUtils.maybeConvertToPrimitiveChunkType((Class)((ColumnDefinition)columnDefinitions.get(ci)).getDataType()).makeWritableChunk(numRows)).toArray(WritableChunk[]::new);
        long snapshotStartNanos = System.nanoTime();
        long expandedSize = view.getHierarchicalTable().snapshot(view.getSnapshotState(), view.getKeyTable(), view.getKeyTableActionColumn(), columns, (RowSequence)rows, destinations);
        snapshotNanosConsumer.accept(System.nanoTime() - snapshotStartNanos);
        BarrageMessage barrageMessage = new BarrageMessage();
        barrageMessage.isSnapshot = true;
        barrageMessage.rowsAdded = RowSetFactory.flat((long)expandedSize);
        barrageMessage.rowsIncluded = rows.isEmpty() || expandedSize <= rows.firstRowKey() ? RowSetFactory.empty() : RowSetFactory.fromRange((long)rows.firstRowKey(), (long)Math.min(expandedSize - 1L, rows.lastRowKey()));
        barrageMessage.rowsRemoved = RowSetFactory.empty();
        barrageMessage.shifted = RowSetShiftData.EMPTY;
        barrageMessage.tableSize = expandedSize;
        barrageMessage.addColumnData = new BarrageMessage.AddColumnData[numAvailableColumns];
        ChunkWriter[] chunkWriters = new ChunkWriter[numAvailableColumns];
        int di = 0;
        for (int ci2 = 0; ci2 < numAvailableColumns; ++ci2) {
            BarrageMessage.AddColumnData addColumnData = new BarrageMessage.AddColumnData();
            ColumnDefinition columnDefinition = (ColumnDefinition)columnDefinitions.get(ci2);
            addColumnData.type = columnDefinition.getDataType();
            addColumnData.componentType = columnDefinition.getComponentType();
            addColumnData.data = new ArrayList();
            if (columns.get(ci2)) {
                WritableChunk data = destinations[di++];
                addColumnData.data.add(data);
                addColumnData.chunkType = data.getChunkType();
            } else {
                addColumnData.chunkType = ReinterpretUtils.maybeConvertToPrimitiveChunkType((Class)columnDefinition.getDataType());
            }
            barrageMessage.addColumnData[ci2] = addColumnData;
            chunkWriters[ci2] = DefaultChunkWriterFactory.INSTANCE.newWriter(BarrageTypeInfo.make((Class)ReinterpretUtils.maybeConvertToPrimitiveDataType((Class)columnDefinition.getDataType()), (Class)columnDefinition.getComponentType(), (Object)BarrageUtil.flatbufFieldFor((ColumnDefinition)columnDefinition, Map.of())));
        }
        barrageMessage.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS;
        try (BarrageMessageWriter bmw = messageWriterFactory.newMessageWriter(barrageMessage, chunkWriters, writeMetricsConsumer);){
            boolean initialSnapshot = false;
            boolean isFullSubscription = false;
            GrpcUtil.safelyOnNext(listener, (Object)bmw.getSubView(subscriptionOptions, false, false, rows, false, (RowSet)prevKeyspaceViewportRows, barrageMessage.rowsIncluded, columns));
            prevKeyspaceViewportRows.resetTo(barrageMessage.rowsIncluded);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setViewport(@Nullable BitSet viewportColumns, @Nullable RowSet viewportRows, boolean reverseViewport) {
        if (this.state != State.Active) {
            return;
        }
        if (viewportColumns != null && viewportColumns.length() > this.view.getHierarchicalTable().getAvailableColumnDefinitions().size()) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)String.format("Requested columns out of range: length=%d, available length=%d", viewportColumns.length(), this.view.getHierarchicalTable().getAvailableColumnDefinitions().size()));
        }
        if (viewportRows != null) {
            if (!viewportRows.isContiguous()) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"HierarchicalTableView subscriptions only support contiguous viewports");
            }
            if (viewportRows.size() > (long)ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)String.format("HierarchicalTableView subscriptions only support viewport size up to %d rows, requested %d rows", ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY, viewportRows.size()));
            }
        }
        if (reverseViewport) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"HierarchicalTableView subscriptions do not support reverse viewports");
        }
        BitSet newColumns = viewportColumns == null ? null : (BitSet)viewportColumns.clone();
        WritableRowSet newRows = viewportRows == null ? null : viewportRows.copy();
        long currentTimeNanos = System.nanoTime();
        Object object = this.schedulingLock;
        synchronized (object) {
            if (this.state != State.Active) {
                return;
            }
            if (newColumns != null) {
                this.pendingColumns = newColumns;
            }
            if (newRows != null) {
                try (RowSet ignored = this.pendingRows;){
                    this.pendingRows = newRows;
                }
            }
            if (newColumns != null || newRows != null) {
                this.scheduleImmediately(currentTimeNanos);
            }
        }
    }

    private void scheduleImmediately(long currentTimeNanos) {
        Assert.assertion((boolean)Thread.holdsLock(this.schedulingLock), (String)"Thread.holdsLock(schedulingLock)");
        if (!this.snapshotPending || currentTimeNanos < this.scheduledTimeNanos) {
            this.snapshotPending = true;
            this.scheduledTimeNanos = currentTimeNanos;
            this.scheduler.runImmediately(this.propagationJob);
        }
    }

    private void scheduleAtInterval(long currentTimeNanos) {
        Assert.assertion((boolean)Thread.holdsLock(this.schedulingLock), (String)"Thread.holdsLock(schedulingLock)");
        long targetTimeNanos = this.lastSnapshotTimeNanos + this.intervalDurationNanos;
        long delayNanos = targetTimeNanos - currentTimeNanos;
        if (delayNanos < 0L) {
            this.scheduleImmediately(currentTimeNanos);
        } else if (!this.snapshotPending || targetTimeNanos < this.scheduledTimeNanos) {
            this.snapshotPending = true;
            this.scheduledTimeNanos = targetTimeNanos;
            long delayMillis = TimeUnit.MILLISECONDS.convert(delayNanos, TimeUnit.NANOSECONDS);
            this.scheduler.runAfterDelay(delayMillis, this.propagationJob);
        }
    }

    private static enum State {
        Active,
        Failed,
        Done;

    }

    private class Stats
    implements Runnable {
        private final int NUM_SIG_FIGS = 3;
        private final String statsKey;
        private final String statsId;
        private final Histogram snapshotNanos = new Histogram(3);
        private final Histogram writeNanos = new Histogram(3);
        private final Histogram writeBits = new Histogram(3);
        private volatile boolean running = true;

        private Stats(String statsKey) {
            this.statsKey = statsKey;
            this.statsId = Integer.toHexString(System.identityHashCode((Object)HierarchicalTableViewSubscription.this));
            HierarchicalTableViewSubscription.this.scheduler.runAfterDelay(BarragePerformanceLog.CYCLE_DURATION_MILLIS, this);
        }

        private void stop() {
            this.running = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void run() {
            BarrageSubscriptionPerformanceLogger logger;
            Instant now = HierarchicalTableViewSubscription.this.scheduler.instantMillis();
            if (this.running) {
                HierarchicalTableViewSubscription.this.scheduler.runAfterDelay(BarragePerformanceLog.CYCLE_DURATION_MILLIS, this);
            }
            BarrageSubscriptionPerformanceLogger barrageSubscriptionPerformanceLogger = logger = BarragePerformanceLog.getInstance().getSubscriptionLogger();
            synchronized (barrageSubscriptionPerformanceLogger) {
                this.flush(now, logger, this.snapshotNanos, "SnapshotMillis");
                this.flush(now, logger, this.writeNanos, "WriteMillis");
                this.flush(now, logger, this.writeBits, "WriteMegabits");
            }
        }

        private void flush(@NotNull Instant now, @NotNull BarrageSubscriptionPerformanceLogger logger, @NotNull Histogram hist, @NotNull String statType) {
            if (hist.getTotalCount() == 0L) {
                return;
            }
            logger.log(this.statsId, this.statsKey, statType, now, hist);
            hist.reset();
        }
    }

    private class ChangeListener
    extends InstrumentedTableUpdateListener {
        private ChangeListener() {
            super("HierarchicalTableViewSubscription.ChangeListener");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onUpdate(@NotNull TableUpdate upstream) {
            if (HierarchicalTableViewSubscription.this.state != State.Active) {
                return;
            }
            long currentTimeNanos = System.nanoTime();
            Object object = HierarchicalTableViewSubscription.this.schedulingLock;
            synchronized (object) {
                if (HierarchicalTableViewSubscription.this.state != State.Active) {
                    return;
                }
                HierarchicalTableViewSubscription.this.upstreamDataChanged = true;
                HierarchicalTableViewSubscription.this.scheduleAtInterval(currentTimeNanos);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void onFailureInternal(@NotNull Throwable originalException, @Nullable TableListener.Entry sourceEntry) {
            if (HierarchicalTableViewSubscription.this.state != State.Active) {
                return;
            }
            HierarchicalTableViewSubscription.this.forceReferenceCountToZero();
            long currentTimeNanos = System.nanoTime();
            Object object = HierarchicalTableViewSubscription.this.schedulingLock;
            synchronized (object) {
                if (HierarchicalTableViewSubscription.this.state != State.Active) {
                    return;
                }
                HierarchicalTableViewSubscription.this.upstreamFailure = originalException;
                HierarchicalTableViewSubscription.this.state = State.Failed;
                HierarchicalTableViewSubscription.this.scheduleImmediately(currentTimeNanos);
            }
        }
    }

    public static interface Factory {
        public HierarchicalTableViewSubscription create(HierarchicalTableView var1, StreamObserver<BarrageMessageWriter.MessageView> var2, BarrageSubscriptionOptions var3, long var4);
    }
}

