/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.barrage;

import com.google.common.annotations.VisibleForTesting;
import com.google.flatbuffers.FlatBufferBuilder;
import com.google.rpc.Code;
import io.deephaven.base.formatters.FormatBitSet;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ResettableWritableChunk;
import io.deephaven.chunk.ResettableWritableObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ChunkSink;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.MemoizedOperationKey;
import io.deephaven.engine.table.impl.NotificationStepReceiver;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.table.impl.select.VectorChunkAdapter;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.FillUnordered;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.table.impl.util.ShiftInversionHelper;
import io.deephaven.engine.table.impl.util.UpdateCoalescer;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger;
import io.deephaven.extensions.barrage.BarrageTypeInfo;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.extensions.barrage.chunk.DefaultChunkWriterFactory;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.vector.Vector;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Stream;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import org.HdrHistogram.Histogram;
import org.apache.arrow.flatbuf.Schema;
import org.apache.commons.lang3.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class BarrageMessageProducer
extends LivenessArtifact
implements DynamicNode,
NotificationStepReceiver {
    private static final int DELTA_CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault(BarrageMessageProducer.class, "deltaChunkSize", ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY);
    private static final Logger log = LoggerFactory.getLogger(BarrageMessageProducer.class);
    public static final boolean SUBSCRIPTION_GROWTH_ENABLED = Configuration.getInstance().getBooleanForClassWithDefault(BarrageMessageProducer.class, "subscriptionGrowthEnabled", false);
    private long snapshotTargetCellCount = BarrageUtil.MIN_SNAPSHOT_CELL_COUNT;
    private double snapshotNanosPerCell = 0.0;
    private final String logPrefix;
    private final Scheduler scheduler;
    private final SessionService.ErrorTransformer errorTransformer;
    private final BarrageMessageWriter.Factory streamGeneratorFactory;
    private final BaseTable<?> parent;
    private final long updateIntervalMs;
    private volatile long lastUpdateTime = 0L;
    private volatile long lastScheduledUpdateTime = 0L;
    private final boolean isBlinkTable;
    private long blinkTableUpdateSize = 0L;
    private long lastBlinkTableUpdateSize = 0L;
    private final Stats stats;
    private final ChunkSource.WithPrev<Values>[] chunkSources;
    private final ChunkWriter<Chunk<Values>>[] chunkWriters;
    private final BitSet objectColumns = new BitSet();
    private final Class<?>[] realColumnType;
    private final Class<?>[] realColumnComponentType;
    private final WritableRowSet propagationRowSet;
    private long parentTableSize;
    private long nextFreeDeltaKey = 0L;
    private final WritableColumnSource<?>[] deltaColumns;
    private long lastUpdateClockStep = 0L;
    private Throwable pendingError = null;
    private final List<Delta> pendingDeltas = new ArrayList<Delta>();
    private final UpdatePropagationJob updatePropagationJob = new UpdatePropagationJob();
    private RowSet activeViewport = null;
    private RowSet activeReverseViewport = null;
    private WritableRowSet postSnapshotViewport = null;
    private WritableRowSet postSnapshotReverseViewport = null;
    private final BitSet activeColumns = new BitSet();
    private final BitSet postSnapshotColumns = new BitSet();
    private final BitSet objectColumnsToClear = new BitSet();
    private long numFullSubscriptions = 0L;
    private long numGrowingSubscriptions = 0L;
    private List<Subscription> pendingSubscriptions = new ArrayList<Subscription>();
    private final ArrayList<Subscription> activeSubscriptions = new ArrayList();
    private Runnable onGetSnapshot;
    private boolean onGetSnapshotIsPreSnap;
    private final boolean parentIsRefreshing;
    private final List<Object> parents = Collections.synchronizedList(new ArrayList());

    public BarrageMessageProducer(Scheduler scheduler, SessionService.ErrorTransformer errorTransformer, BarrageMessageWriter.Factory streamGeneratorFactory, BaseTable<?> parent, long updateIntervalMs, Runnable onGetSnapshot) {
        this.logPrefix = "BarrageMessageProducer(" + Integer.toHexString(System.identityHashCode((Object)this)) + "): ";
        this.scheduler = scheduler;
        this.errorTransformer = errorTransformer;
        this.streamGeneratorFactory = streamGeneratorFactory;
        this.parent = parent;
        this.isBlinkTable = parent.isBlink();
        String tableKey = BarragePerformanceLog.getKeyFor(parent);
        this.stats = scheduler.inTestMode() || tableKey == null ? null : new Stats(tableKey);
        this.propagationRowSet = RowSetFactory.empty();
        this.updateIntervalMs = updateIntervalMs;
        this.onGetSnapshot = onGetSnapshot;
        this.parentTableSize = parent.size();
        this.parentIsRefreshing = parent.isRefreshing();
        if (log.isDebugEnabled()) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Creating new BarrageMessageProducer for ").append(System.identityHashCode(parent)).append((CharSequence)" with an interval of ").append(updateIntervalMs).endl();
        }
        ColumnSource[] sources = parent.getColumnSources().toArray(ColumnSource.ZERO_LENGTH_COLUMN_SOURCE_ARRAY);
        this.chunkSources = new ChunkSource.WithPrev[sources.length];
        this.deltaColumns = new WritableColumnSource[sources.length];
        this.realColumnType = new Class[sources.length];
        this.realColumnComponentType = new Class[sources.length];
        this.chunkWriters = new ChunkWriter[sources.length];
        MutableInt mi = new MutableInt();
        Schema schema = SchemaHelper.flatbufSchema((ByteBuffer)BarrageUtil.schemaBytesFromTable(parent).asReadOnlyByteBuffer());
        parent.getColumnSourceMap().forEach((columnName, columnSource) -> {
            int ii = mi.getAndIncrement();
            this.chunkWriters[ii] = DefaultChunkWriterFactory.INSTANCE.newWriter(BarrageTypeInfo.make((Class)ReinterpretUtils.maybeConvertToPrimitiveDataType((Class)columnSource.getType()), (Class)columnSource.getComponentType(), (Object)schema.fields(ii)));
        });
        boolean capacity = false;
        for (int ci = 0; ci < sources.length; ++ci) {
            this.realColumnType[ci] = sources[ci].getType();
            this.realColumnComponentType[ci] = sources[ci].getComponentType();
            sources[ci] = ReinterpretUtils.maybeConvertToPrimitive((ColumnSource)sources[ci]);
            this.chunkSources[ci] = Vector.class.isAssignableFrom(sources[ci].getType()) ? new VectorChunkAdapter((ChunkSource.WithPrev)sources[ci]) : sources[ci];
            this.deltaColumns[ci] = ArrayBackedColumnSource.getMemoryColumnSource((long)0L, (Class)sources[ci].getType(), (Class)sources[ci].getComponentType());
            if (!(this.deltaColumns[ci] instanceof ObjectArraySource)) continue;
            this.objectColumns.set(ci);
        }
    }

    @VisibleForTesting
    public RowSet getRowSet() {
        return this.parent.getRowSet();
    }

    @VisibleForTesting
    public TableDefinition getTableDefinition() {
        return this.parent.getDefinition();
    }

    @VisibleForTesting
    public void setOnGetSnapshot(Runnable onGetSnapshot, boolean isPreSnap) {
        this.onGetSnapshot = onGetSnapshot;
        this.onGetSnapshotIsPreSnap = isPreSnap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSubscription(StreamObserver<BarrageMessageWriter.MessageView> listener, BarrageSubscriptionOptions options, @Nullable BitSet columnsToSubscribe, @Nullable RowSet initialViewport, boolean reverseViewport) {
        BarrageMessageProducer barrageMessageProducer = this;
        synchronized (barrageMessageProducer) {
            BitSet cols;
            boolean hasSubscription;
            boolean bl = hasSubscription = this.activeSubscriptions.stream().anyMatch(item -> item.listener == listener) || this.pendingSubscriptions.stream().anyMatch(item -> item.listener == listener);
            if (hasSubscription) {
                throw new IllegalStateException("Asking to add a subscription for an already existing session and listener");
            }
            if (this.isBlinkTable && reverseViewport) {
                GrpcUtil.safelyError(listener, (Code)Code.INVALID_ARGUMENT, (String)"Reverse viewport is not supported for blink tables");
                return;
            }
            if (columnsToSubscribe == null) {
                cols = new BitSet(this.chunkSources.length);
                cols.set(0, this.chunkSources.length);
            } else {
                cols = (BitSet)columnsToSubscribe.clone();
            }
            Subscription subscription = new Subscription(listener, options, cols, initialViewport, reverseViewport);
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)subscription.logPrefix).append((CharSequence)"subbing to columns ").append(FormatBitSet.formatBitSet((BitSet)cols)).append((CharSequence)" and scheduling update immediately, for initial snapshot.").endl();
            }
            subscription.hasPendingUpdate = true;
            this.pendingSubscriptions.add(subscription);
            this.updatePropagationJob.scheduleImmediately();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean findAndUpdateSubscription(StreamObserver<BarrageMessageWriter.MessageView> listener, Consumer<Subscription> updateSubscription) {
        Function<List, Boolean> findAndUpdate = subscriptions -> {
            for (Subscription sub : subscriptions) {
                if (sub.listener != listener) continue;
                updateSubscription.accept(sub);
                if (!sub.hasPendingUpdate) {
                    sub.hasPendingUpdate = true;
                    this.pendingSubscriptions.add(sub);
                }
                if (log.isDebugEnabled()) {
                    log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Find and update subscription scheduling immediately.").endl();
                }
                this.updatePropagationJob.scheduleImmediately();
                return true;
            }
            return false;
        };
        BarrageMessageProducer barrageMessageProducer = this;
        synchronized (barrageMessageProducer) {
            return findAndUpdate.apply(this.activeSubscriptions) != false || findAndUpdate.apply(this.pendingSubscriptions) != false;
        }
    }

    public boolean updateSubscription(StreamObserver<BarrageMessageWriter.MessageView> listener, @Nullable RowSet newViewport, @Nullable BitSet columnsToSubscribe) {
        return this.updateSubscription(listener, newViewport, columnsToSubscribe, false);
    }

    public boolean updateSubscription(StreamObserver<BarrageMessageWriter.MessageView> listener, @Nullable RowSet newViewport, @Nullable BitSet columnsToSubscribe, boolean newReverseViewport) {
        return this.findAndUpdateSubscription(listener, sub -> {
            BitSet cols;
            if (sub.isFullSubscription()) {
                GrpcUtil.safelyError((StreamObserver)listener, (Code)Code.INVALID_ARGUMENT, (String)"cannot change from full subscription to viewport or vice versa");
                this.removeSubscription(listener);
                return;
            }
            if (sub.pendingViewport != null) {
                sub.pendingViewport.close();
            }
            sub.pendingViewport = newViewport != null ? newViewport.copy() : null;
            sub.pendingReverseViewport = newReverseViewport;
            if (this.isBlinkTable && newReverseViewport) {
                GrpcUtil.safelyError((StreamObserver)listener, (Code)Code.INVALID_ARGUMENT, (String)"Reverse viewport is not supported for blink tables");
                this.removeSubscription(listener);
                return;
            }
            if (columnsToSubscribe == null) {
                cols = new BitSet(this.chunkSources.length);
                cols.set(0, this.chunkSources.length);
            } else {
                cols = (BitSet)columnsToSubscribe.clone();
            }
            sub.pendingColumns = cols;
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)sub.logPrefix).append((CharSequence)"scheduling update immediately, for viewport and column updates.").endl();
            }
        });
    }

    public void removeSubscription(StreamObserver<BarrageMessageWriter.MessageView> listener) {
        this.findAndUpdateSubscription(listener, sub -> {
            sub.pendingDelete = true;
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)sub.logPrefix).append((CharSequence)"scheduling update immediately, for removed subscription.").endl();
            }
        });
    }

    public InstrumentedTableUpdateListener constructListener() {
        return this.parentIsRefreshing ? new DeltaListener() : null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void enqueueUpdate(TableUpdate upstream) {
        BitSet modifiedColumns;
        WritableRowSet modsToRecord;
        WritableRowSet addsToRecord;
        TrackingRowSet rowSet;
        block63: {
            block68: {
                block67: {
                    Assert.assertion((boolean)Thread.holdsLock((Object)this), (String)"enqueueUpdate must hold lock!");
                    rowSet = this.parent.getRowSet();
                    if (this.isBlinkTable) {
                        Assert.assertion((boolean)upstream.modified().isEmpty(), (String)"upstream.modified().isEmpty()");
                    }
                    if (this.numFullSubscriptions <= 0L) break block67;
                    addsToRecord = upstream.added().copy();
                    modsToRecord = upstream.modified().copy();
                    if (this.isBlinkTable) {
                        this.blinkTableUpdateSize += upstream.added().size();
                    }
                    break block63;
                }
                if (this.activeViewport == null && this.activeReverseViewport == null) break block68;
                if (this.isBlinkTable) {
                    Assert.eqNull((Object)this.activeReverseViewport, (String)"activeReverseViewport");
                    modsToRecord = RowSetFactory.empty();
                    long newRows = upstream.added().size();
                    if (newRows == 0L) {
                        addsToRecord = RowSetFactory.empty();
                        break block63;
                    } else {
                        try (WritableRowSet updateRows = RowSetFactory.fromRange((long)this.blinkTableUpdateSize, (long)(this.blinkTableUpdateSize + newRows - 1L));){
                            updateRows.retain(this.activeViewport);
                            updateRows.shiftInPlace(-this.blinkTableUpdateSize);
                            this.blinkTableUpdateSize += newRows;
                            addsToRecord = upstream.added().subSetForPositions((RowSequence)updateRows);
                        }
                    }
                } else {
                    try (WritableRowSet forwardDeltaViewport = this.activeViewport == null ? null : rowSet.subSetForPositions((RowSequence)this.activeViewport);
                         WritableRowSet reverseDeltaViewport = this.activeReverseViewport == null ? null : rowSet.subSetForReversePositions((RowSequence)this.activeReverseViewport);){
                        WritableRowSet deltaViewport;
                        if (forwardDeltaViewport != null) {
                            if (reverseDeltaViewport != null) {
                                forwardDeltaViewport.insert((RowSet)reverseDeltaViewport);
                            }
                            deltaViewport = forwardDeltaViewport;
                        } else {
                            deltaViewport = reverseDeltaViewport;
                        }
                        addsToRecord = deltaViewport.intersect(upstream.added());
                        modsToRecord = deltaViewport.intersect(upstream.modified());
                        break block63;
                    }
                }
            }
            addsToRecord = RowSetFactory.empty();
            modsToRecord = RowSetFactory.empty();
        }
        if ((this.activeViewport != null || this.activeReverseViewport != null) && (upstream.added().isNonempty() || upstream.removed().isNonempty()) && rowSet.isNonempty() && rowSet.sizePrev() > 0L && !this.isBlinkTable) {
            RowSetBuilderRandom scopedViewBuilder = RowSetFactory.builderRandom();
            try (WritableRowSet prevRowSet = rowSet.copyPrev();){
                for (Subscription sub : this.activeSubscriptions) {
                    if (!sub.isViewport() || sub.pendingDelete) continue;
                    ShiftInversionHelper inverter = new ShiftInversionHelper(upstream.shifted(), sub.reverseViewport);
                    sub.viewport.forAllRowKeyRanges((arg_0, arg_1) -> BarrageMessageProducer.lambda$enqueueUpdate$6(sub, rowSet, inverter, (RowSet)prevRowSet, scopedViewBuilder, arg_0, arg_1));
                }
            }
            try (WritableRowSet scoped = scopedViewBuilder.build();){
                upstream.shifted().apply(scoped);
                scoped.retain((RowSet)rowSet);
                addsToRecord.insert((RowSet)scoped);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"updateGraph=").append((LogOutputAppendable)this.parent.getUpdateGraph()).append((CharSequence)", step=").append(this.parent.getUpdateGraph().clock().currentStep()).append((CharSequence)", upstream=").append((LogOutputAppendable)upstream).append((CharSequence)", activeSubscriptions=").append(this.activeSubscriptions.size()).append((CharSequence)", numFullSubscriptions=").append(this.numFullSubscriptions).append((CharSequence)", addsToRecord=").append((LogOutputAppendable)addsToRecord).append((CharSequence)", modsToRecord=").append((LogOutputAppendable)modsToRecord).append((CharSequence)", activeViewport=").append((LogOutputAppendable)this.activeViewport).append((CharSequence)", activeReverseViewport=").append((LogOutputAppendable)this.activeReverseViewport).append((CharSequence)", columns=").append(FormatBitSet.formatBitSet((BitSet)this.activeColumns)).endl();
        }
        if (upstream.modified().isEmpty()) {
            modifiedColumns = new BitSet();
        } else if (upstream.modifiedColumnSet() == ModifiedColumnSet.ALL) {
            modifiedColumns = (BitSet)this.activeColumns.clone();
        } else {
            modifiedColumns = upstream.modifiedColumnSet().extractAsBitSet();
            modifiedColumns.and(this.activeColumns);
        }
        long deltaColumnOffset = this.nextFreeDeltaKey;
        if (addsToRecord.isNonempty() || modsToRecord.isNonempty()) {
            FillDeltaContext[] fillDeltaContexts = new FillDeltaContext[this.activeColumns.cardinality()];
            try (SharedContext sharedContext = SharedContext.makeSharedContext();
                 SafeCloseableArray ignored = new SafeCloseableArray((AutoCloseable[])fillDeltaContexts);){
                int totalSize = LongSizedDataStructure.intSize((String)"BarrageMessageProducer#enqueueUpdate", (long)(addsToRecord.size() + modsToRecord.size() + this.nextFreeDeltaKey));
                int deltaChunkSize = (int)Math.min((long)DELTA_CHUNK_SIZE, Math.max(addsToRecord.size(), modsToRecord.size()));
                int columnIndex = this.activeColumns.nextSetBit(0);
                int aci = 0;
                while (columnIndex >= 0) {
                    if (!addsToRecord.isEmpty() || modifiedColumns.get(columnIndex)) {
                        this.deltaColumns[columnIndex].ensureCapacity((long)totalSize);
                        fillDeltaContexts[aci++] = new FillDeltaContext(columnIndex, this.chunkSources[columnIndex], this.deltaColumns[columnIndex], sharedContext, deltaChunkSize);
                    }
                    columnIndex = this.activeColumns.nextSetBit(columnIndex + 1);
                }
                BiConsumer<RowSet, BitSet> recordRows = (keysToAdd, columnsToRecord) -> {
                    try (RowSequence.Iterator rsIt = keysToAdd.getRowSequenceIterator();){
                        while (rsIt.hasMore()) {
                            RowSequence srcKeys = rsIt.getNextRowSequenceWithLength((long)DELTA_CHUNK_SIZE);
                            RowSequence dstKeys = RowSequenceFactory.forRange((long)this.nextFreeDeltaKey, (long)(this.nextFreeDeltaKey + srcKeys.size() - 1L));
                            try {
                                this.nextFreeDeltaKey += srcKeys.size();
                                for (FillDeltaContext fillDeltaContext : fillDeltaContexts) {
                                    if (fillDeltaContext == null) break;
                                    if (!columnsToRecord.get(fillDeltaContext.columnIndex)) continue;
                                    fillDeltaContext.doFillChunk(srcKeys, dstKeys);
                                }
                                sharedContext.reset();
                            }
                            finally {
                                if (dstKeys == null) continue;
                                dstKeys.close();
                            }
                        }
                        return;
                    }
                };
                if (addsToRecord.isNonempty()) {
                    recordRows.accept((RowSet)addsToRecord, this.activeColumns);
                }
                if (modsToRecord.isNonempty()) {
                    recordRows.accept((RowSet)modsToRecord, modifiedColumns);
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"update accumulation complete for step=").append(this.parent.getUpdateGraph().clock().currentStep()).endl();
        }
        this.pendingDeltas.add(new Delta(this.parent.getUpdateGraph().clock().currentStep(), deltaColumnOffset, upstream, addsToRecord, (RowSet)modsToRecord, (BitSet)this.activeColumns.clone(), modifiedColumns));
    }

    private void schedulePropagation() {
        Assert.assertion((boolean)Thread.holdsLock((Object)this), (String)"schedulePropagation must hold lock!");
        long localLastUpdateTime = this.lastUpdateTime;
        long now = this.scheduler.currentTimeMillis();
        long msSinceLastUpdate = now - localLastUpdateTime;
        if (this.lastScheduledUpdateTime != 0L && this.lastScheduledUpdateTime > this.lastUpdateTime) {
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Not scheduling update, because last update was ").append(localLastUpdateTime).append((CharSequence)" and now is ").append(now).append((CharSequence)" msSinceLastUpdate=").append(msSinceLastUpdate).append((CharSequence)" interval=").append(this.updateIntervalMs).append((CharSequence)" already scheduled to run at ").append(this.lastScheduledUpdateTime).endl();
            }
        } else if (msSinceLastUpdate < localLastUpdateTime) {
            long nextRunTime = localLastUpdateTime + this.updateIntervalMs;
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Last Update Time: ").append(localLastUpdateTime).append((CharSequence)" next run: ").append(nextRunTime).endl();
            }
            this.lastScheduledUpdateTime = nextRunTime;
            this.updatePropagationJob.scheduleAt(nextRunTime);
        } else {
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Scheduling update immediately, because last update was ").append(localLastUpdateTime).append((CharSequence)" and now is ").append(now).append((CharSequence)" msSinceLastUpdate=").append(msSinceLastUpdate).append((CharSequence)" interval=").append(this.updateIntervalMs).endl();
            }
            this.updatePropagationJob.scheduleImmediately();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSubscriptionsSnapshotAndPropagate() {
        this.lastUpdateTime = this.scheduler.currentTimeMillis();
        if (log.isDebugEnabled()) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)("Starting update job at " + this.lastUpdateTime)).endl();
        }
        boolean firstSubscription = false;
        boolean pendingChanges = false;
        ArrayList<Subscription> deletedSubscriptions = null;
        BarrageMessageProducer barrageMessageProducer = this;
        synchronized (barrageMessageProducer) {
            List<Subscription> updatedSubscriptions = null;
            if (!this.pendingSubscriptions.isEmpty()) {
                updatedSubscriptions = this.pendingSubscriptions;
                this.pendingSubscriptions = new ArrayList<Subscription>();
            }
            if (updatedSubscriptions != null) {
                for (int i = 0; i < this.activeSubscriptions.size(); ++i) {
                    Subscription sub = this.activeSubscriptions.get(i);
                    if (!sub.pendingDelete) continue;
                    if (deletedSubscriptions == null) {
                        deletedSubscriptions = new ArrayList<Subscription>();
                    }
                    deletedSubscriptions.add(sub);
                    if (!sub.isViewport()) {
                        --this.numFullSubscriptions;
                    }
                    if (sub.isGrowingViewport) {
                        --this.numGrowingSubscriptions;
                    }
                    this.activeSubscriptions.set(i, this.activeSubscriptions.get(this.activeSubscriptions.size() - 1));
                    this.activeSubscriptions.remove(this.activeSubscriptions.size() - 1);
                    --i;
                }
                this.buildPostSnapshotViewports(true);
                for (Subscription subscription : updatedSubscriptions) {
                    if (subscription.pendingDelete) continue;
                    pendingChanges = true;
                    if (!subscription.isGrowingViewport) {
                        subscription.isGrowingViewport = true;
                        ++this.numGrowingSubscriptions;
                    }
                    subscription.hasPendingUpdate = false;
                    if (!subscription.isActive) {
                        firstSubscription |= this.activeSubscriptions.isEmpty();
                        subscription.isActive = true;
                        this.activeSubscriptions.add(subscription);
                    }
                    try (RowSet ignored = subscription.targetViewport;){
                        subscription.targetViewport = subscription.pendingViewport;
                        subscription.pendingViewport = null;
                    }
                    subscription.targetColumns = subscription.pendingColumns;
                    subscription.pendingColumns = null;
                    subscription.targetReverseViewport = subscription.pendingReverseViewport;
                    subscription.isFirstSnapshot = true;
                    if (subscription.growingRemainingViewport != null) {
                        subscription.growingRemainingViewport.close();
                    }
                    subscription.growingRemainingViewport = subscription.targetViewport == null ? RowSetFactory.flat((long)Long.MAX_VALUE) : subscription.targetViewport.copy();
                }
            }
            if (deletedSubscriptions != null && !pendingChanges) {
                this.promoteSnapshotToActive();
            }
        }
        BarrageMessage preSnapshot = null;
        BarrageMessage blinkTableFlushPreSnapshot = null;
        WritableRowSet preSnapRowSetPrev = null;
        WritableRowSet preSnapRowSet = null;
        WritableRowSet postSnapRowSetPrev = null;
        BarrageMessage snapshot = null;
        BarrageMessage postSnapshot = null;
        LinkedList<Subscription> growingSubscriptions = new LinkedList<Subscription>();
        if (this.numGrowingSubscriptions > 0L) {
            long rowsRemaining;
            if (!pendingChanges) {
                this.postSnapshotViewport = this.activeViewport != null ? this.activeViewport.copy() : RowSetFactory.empty();
                this.postSnapshotReverseViewport = this.activeReverseViewport != null ? this.activeReverseViewport.copy() : RowSetFactory.empty();
                this.postSnapshotColumns.clear();
                this.postSnapshotColumns.or(this.activeColumns);
            }
            BitSet snapshotColumns = new BitSet();
            for (Subscription subscription : this.activeSubscriptions) {
                if (!subscription.isGrowingViewport) continue;
                snapshotColumns.or(subscription.targetColumns);
                if (subscription.targetViewport == null) {
                    growingSubscriptions.addLast(subscription);
                    continue;
                }
                growingSubscriptions.addFirst(subscription);
            }
            long columnCount = Math.max(1, snapshotColumns.cardinality());
            if (SUBSCRIPTION_GROWTH_ENABLED) {
                long cellCount = Math.max(BarrageUtil.MIN_SNAPSHOT_CELL_COUNT, Math.min(this.snapshotTargetCellCount, BarrageUtil.MAX_SNAPSHOT_CELL_COUNT));
                rowsRemaining = cellCount / columnCount;
            } else {
                rowsRemaining = Long.MAX_VALUE;
            }
            RowSetBuilderRandom viewportBuilder = RowSetFactory.builderRandom();
            RowSetBuilderRandom reverseViewportBuilder = RowSetFactory.builderRandom();
            try (WritableRowSet snapshotRowSet = RowSetFactory.empty();
                 WritableRowSet reverseSnapshotRowSet = RowSetFactory.empty();){
                for (Subscription subscription : growingSubscriptions) {
                    boolean viewportValid;
                    BitSet addedCols = (BitSet)subscription.targetColumns.clone();
                    addedCols.andNot(subscription.subscribedColumns);
                    boolean bl = viewportValid = subscription.reverseViewport == subscription.targetReverseViewport && addedCols.isEmpty();
                    if (viewportValid && subscription.viewport != null) {
                        if (subscription.isFirstSnapshot) {
                            subscription.snapshotViewport = subscription.growingRemainingViewport.extract(subscription.viewport);
                            if (subscription.targetReverseViewport) {
                                reverseViewportBuilder.addRowSet((RowSet)subscription.snapshotViewport);
                            } else {
                                viewportBuilder.addRowSet((RowSet)subscription.snapshotViewport);
                            }
                        } else {
                            subscription.snapshotViewport = subscription.viewport.copy();
                        }
                    } else {
                        subscription.snapshotViewport = RowSetFactory.empty();
                    }
                    subscription.isFirstSnapshot = false;
                    WritableRowSet currentSet = subscription.targetReverseViewport ? reverseSnapshotRowSet : snapshotRowSet;
                    subscription.growingIncrementalViewport = subscription.growingRemainingViewport.extract((RowSet)currentSet);
                    if (rowsRemaining > 0L) {
                        try (WritableRowSet additional = subscription.growingRemainingViewport.copy();){
                            if (additional.size() > rowsRemaining) {
                                long key = additional.get(rowsRemaining);
                                additional.removeRange(key, 0x7FFFFFFFFFFFFFFEL);
                                subscription.growingRemainingViewport.removeRange(0L, key - 1L);
                            } else {
                                subscription.growingRemainingViewport.clear();
                            }
                            subscription.growingIncrementalViewport.insert((RowSet)additional);
                            currentSet.insert((RowSet)additional);
                            if (subscription.targetReverseViewport) {
                                reverseViewportBuilder.addRowSet((RowSet)additional);
                            } else {
                                viewportBuilder.addRowSet((RowSet)additional);
                            }
                            rowsRemaining -= additional.size();
                        }
                    }
                    subscription.snapshotViewport.insert((RowSet)subscription.growingIncrementalViewport);
                    subscription.snapshotColumns = (BitSet)subscription.targetColumns.clone();
                    subscription.snapshotReverseViewport = subscription.targetReverseViewport;
                }
                try (WritableRowSet vp = viewportBuilder.build();
                     WritableRowSet rvp = reverseViewportBuilder.build();){
                    this.postSnapshotViewport.insert((RowSet)vp);
                    this.postSnapshotReverseViewport.insert((RowSet)rvp);
                }
                this.postSnapshotColumns.or(snapshotColumns);
                long start = System.nanoTime();
                if (!this.isBlinkTable) {
                    snapshot = this.getSnapshot(growingSubscriptions, snapshotColumns, (RowSet)snapshotRowSet, (RowSet)reverseSnapshotRowSet);
                } else {
                    snapshot = this.getSnapshot(growingSubscriptions, snapshotColumns, (RowSet)RowSetFactory.empty(), (RowSet)RowSetFactory.empty());
                    if (!snapshot.rowsAdded.isEmpty()) {
                        snapshot.rowsAdded.close();
                        snapshot.rowsAdded = RowSetFactory.empty();
                        snapshot.tableSize = 0L;
                    }
                }
                long elapsed = System.nanoTime() - start;
                this.recordMetric(stats -> stats.snapshot, elapsed);
                if (SUBSCRIPTION_GROWTH_ENABLED && !snapshot.rowsIncluded.isEmpty()) {
                    PeriodicUpdateGraph updateGraph = (PeriodicUpdateGraph)this.parent.getUpdateGraph().cast();
                    long targetNanos = (long)(BarrageUtil.TARGET_SNAPSHOT_PERCENTAGE * (double)updateGraph.getTargetCycleDurationMillis() * 1000000.0);
                    long nanosPerCell = elapsed / (snapshot.rowsIncluded.size() * columnCount);
                    this.snapshotNanosPerCell = this.snapshotNanosPerCell == 0.0 ? (double)nanosPerCell : this.snapshotNanosPerCell * 0.9 + (double)nanosPerCell * 0.1;
                    this.snapshotTargetCellCount = (long)((double)targetNanos / Math.max(1.0, this.snapshotNanosPerCell));
                }
            }
        }
        BarrageMessageProducer columnCount = this;
        synchronized (columnCount) {
            int deltaSplitIdx;
            if (growingSubscriptions.isEmpty() && this.pendingDeltas.isEmpty() && this.pendingError == null) {
                return;
            }
            long maxStep = snapshot != null ? snapshot.firstSeq : Long.MAX_VALUE;
            for (deltaSplitIdx = this.pendingDeltas.size(); deltaSplitIdx > 0 && this.pendingDeltas.get((int)(deltaSplitIdx - 1)).step > maxStep; --deltaSplitIdx) {
            }
            if (snapshot != null && deltaSplitIdx > 0) {
                this.flipSnapshotStateForSubscriptions(growingSubscriptions);
            }
            if (!firstSubscription && deltaSplitIdx > 0) {
                long startTm = System.nanoTime();
                preSnapRowSetPrev = this.propagationRowSet.copy();
                preSnapshot = this.aggregateUpdatesInRange(0, deltaSplitIdx);
                this.recordMetric(stats -> stats.aggregate, System.nanoTime() - startTm);
                preSnapRowSet = this.propagationRowSet.copy();
            }
            if (this.isBlinkTable && this.lastBlinkTableUpdateSize != 0L && snapshot != null) {
                blinkTableFlushPreSnapshot = this.aggregateUpdatesInRange(-1, -1);
            }
            if (firstSubscription) {
                Assert.neqNull(snapshot, (String)"snapshot");
                this.propagationRowSet.clear();
                this.propagationRowSet.insert(snapshot.rowsAdded);
            }
            if (snapshot != null && deltaSplitIdx > 0) {
                this.flipSnapshotStateForSubscriptions(growingSubscriptions);
            }
            if (deltaSplitIdx < this.pendingDeltas.size()) {
                long startTm = System.nanoTime();
                postSnapRowSetPrev = this.propagationRowSet.copy();
                postSnapshot = this.aggregateUpdatesInRange(deltaSplitIdx, this.pendingDeltas.size());
                this.recordMetric(stats -> stats.aggregate, System.nanoTime() - startTm);
            }
            this.clearObjectDeltaColumns(this.objectColumnsToClear);
            if (deletedSubscriptions != null || pendingChanges) {
                this.objectColumnsToClear.clear();
                this.objectColumnsToClear.or(this.objectColumns);
                this.objectColumnsToClear.and(this.activeColumns);
            }
            this.nextFreeDeltaKey = 0L;
            for (Delta delta : this.pendingDeltas) {
                delta.close();
            }
            this.blinkTableUpdateSize = 0L;
            this.pendingDeltas.clear();
        }
        if (preSnapshot != null) {
            long startTm = System.nanoTime();
            this.propagateToSubscribers(preSnapshot, (RowSet)preSnapRowSetPrev, (RowSet)preSnapRowSet);
            this.recordMetric(stats -> stats.propagate, System.nanoTime() - startTm);
            preSnapRowSetPrev.close();
            preSnapRowSet.close();
        }
        if (blinkTableFlushPreSnapshot != null) {
            long startTm = System.nanoTime();
            try (WritableRowSet fakeTableRowSet = RowSetFactory.empty();){
                this.propagateToSubscribers(blinkTableFlushPreSnapshot, (RowSet)fakeTableRowSet, (RowSet)fakeTableRowSet);
            }
            this.recordMetric(stats -> stats.propagate, System.nanoTime() - startTm);
        }
        if (snapshot != null) {
            try (BarrageMessageWriter snapshotGenerator = this.streamGeneratorFactory.newMessageWriter(snapshot, this.chunkWriters, this::recordWriteMetrics);){
                if (log.isDebugEnabled()) {
                    log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Sending snapshot to ").append(this.activeSubscriptions.size()).append((CharSequence)" subscriber(s).").endl();
                }
                for (Subscription subscription2 : growingSubscriptions) {
                    if (subscription2.pendingDelete) continue;
                    long startTm = System.nanoTime();
                    this.propagateSnapshotForSubscription(subscription2, snapshotGenerator);
                    this.recordMetric(stats -> stats.propagate, System.nanoTime() - startTm);
                }
            }
        }
        if (postSnapshot != null) {
            long startTm = System.nanoTime();
            this.propagateToSubscribers(postSnapshot, (RowSet)postSnapRowSetPrev, (RowSet)this.propagationRowSet);
            this.recordMetric(stats -> stats.propagate, System.nanoTime() - startTm);
            postSnapRowSetPrev.close();
        }
        if (deletedSubscriptions != null) {
            for (Subscription subscription : deletedSubscriptions) {
                try {
                    subscription.listener.onCompleted();
                }
                catch (Exception subscription2) {}
            }
        }
        if (this.pendingError != null) {
            StatusRuntimeException ex = this.errorTransformer.transform(this.pendingError);
            for (Subscription subscription : this.activeSubscriptions) {
                GrpcUtil.safelyError(subscription.listener, (StatusRuntimeException)ex);
            }
        }
        if (this.numGrowingSubscriptions > 0L) {
            if (log.isDebugEnabled()) {
                log.info().append((CharSequence)this.logPrefix).append((CharSequence)"Have ").append(this.numGrowingSubscriptions).append((CharSequence)" growing subscriptions; scheduling next snapshot immediately.").endl();
            }
            this.updatePropagationJob.scheduleImmediately();
        }
        this.lastUpdateTime = this.scheduler.currentTimeMillis();
        if (log.isDebugEnabled()) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)("Completed Propagation: " + this.lastUpdateTime)).endl();
        }
    }

    private void propagateToSubscribers(BarrageMessage message, RowSet propRowSetForMessagePrev, RowSet propRowSetForMessage) {
        try (BarrageMessageWriter bmw = this.streamGeneratorFactory.newMessageWriter(message, this.chunkWriters, this::recordWriteMetrics);){
            for (Subscription subscription : this.activeSubscriptions) {
                if (subscription.pendingInitialSnapshot || subscription.pendingDelete) continue;
                boolean isPreSnapshot = subscription.snapshotViewport != null;
                WritableRowSet vp = isPreSnapshot ? subscription.snapshotViewport : subscription.viewport;
                BitSet cols = isPreSnapshot ? subscription.snapshotColumns : subscription.subscribedColumns;
                boolean isReversed = isPreSnapshot ? subscription.snapshotReverseViewport : subscription.reverseViewport;
                try {
                    WritableRowSet clientViewPrev = vp != null ? propRowSetForMessagePrev.subSetForPositions((RowSequence)vp, isReversed) : null;
                    try {
                        WritableRowSet clientView = vp != null ? propRowSetForMessage.subSetForPositions((RowSequence)vp, isReversed) : null;
                        try {
                            subscription.listener.onNext((Object)bmw.getSubView(subscription.options, false, subscription.isFullSubscription(), (RowSet)vp, subscription.reverseViewport, (RowSet)clientViewPrev, (RowSet)clientView, cols));
                        }
                        finally {
                            if (clientView == null) continue;
                            clientView.close();
                        }
                    }
                    finally {
                        if (clientViewPrev == null) continue;
                        clientViewPrev.close();
                    }
                }
                catch (Exception e) {
                    try {
                        subscription.listener.onError((Throwable)this.errorTransformer.transform(e));
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    this.removeSubscription(subscription.listener);
                }
            }
        }
    }

    private void clearObjectDeltaColumns(@NotNull BitSet objectColumnsToClear) {
        try (ResettableWritableObjectChunk backingChunk = ResettableWritableObjectChunk.makeResettableChunk();){
            int columnIndex = objectColumnsToClear.nextSetBit(0);
            while (columnIndex >= 0) {
                ObjectArraySource sourceToNull = (ObjectArraySource)this.deltaColumns[columnIndex];
                long targetCapacity = Math.min(this.nextFreeDeltaKey, sourceToNull.getCapacity());
                for (long positionToNull = 0L; positionToNull < targetCapacity; positionToNull += (long)backingChunk.size()) {
                    sourceToNull.resetWritableChunkToBackingStore((ResettableWritableChunk)backingChunk, positionToNull);
                    backingChunk.fillWithNullValue(0, backingChunk.size());
                }
                columnIndex = objectColumnsToClear.nextSetBit(columnIndex + 1);
            }
        }
    }

    private void propagateSnapshotForSubscription(Subscription subscription, BarrageMessageWriter snapshotGenerator) {
        boolean needsSnapshot = subscription.pendingInitialSnapshot;
        if (subscription.snapshotViewport != null) {
            needsSnapshot = true;
        }
        if (subscription.snapshotColumns != null) {
            subscription.snapshotColumns = null;
            needsSnapshot = true;
        }
        if (needsSnapshot) {
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"Sending snapshot to ").append(System.identityHashCode(subscription)).endl();
            }
            boolean fullSubscription = subscription.isFullSubscription();
            try (WritableRowSet keySpaceViewport = snapshotGenerator.getMessage().rowsAdded.subSetForPositions((RowSequence)(fullSubscription ? subscription.growingIncrementalViewport : subscription.viewport), subscription.reverseViewport);
                 WritableRowSet keySpaceViewportPrev = fullSubscription ? null : snapshotGenerator.getMessage().rowsAdded.subSetForPositions((RowSequence)subscription.snapshotViewport, subscription.snapshotReverseViewport);){
                if (subscription.pendingInitialSnapshot) {
                    subscription.listener.onNext((Object)this.streamGeneratorFactory.getSchemaView(fbb -> BarrageUtil.makeTableSchemaPayload((FlatBufferBuilder)fbb, (BarrageOptions)subscription.options, (TableDefinition)this.parent.getDefinition(), (Map)this.parent.getAttributes(), (boolean)this.parent.isFlat())));
                }
                subscription.listener.onNext((Object)snapshotGenerator.getSubView(subscription.options, subscription.pendingInitialSnapshot, fullSubscription, subscription.viewport, subscription.reverseViewport, (RowSet)keySpaceViewportPrev, (RowSet)keySpaceViewport, subscription.subscribedColumns));
            }
            catch (Exception e) {
                GrpcUtil.safelyError(subscription.listener, (StatusRuntimeException)this.errorTransformer.transform(e));
                this.removeSubscription(subscription.listener);
            }
        }
        if (subscription.snapshotViewport != null) {
            subscription.snapshotViewport.close();
            subscription.snapshotViewport = null;
        }
        if (subscription.growingIncrementalViewport != null) {
            subscription.growingIncrementalViewport.close();
            subscription.growingIncrementalViewport = null;
        }
        subscription.pendingInitialSnapshot = false;
    }

    private BarrageMessage aggregateUpdatesInRange(int startDelta, int endDelta) {
        Delta firstDelta;
        Assert.assertion((boolean)Thread.holdsLock((Object)this), (String)"propagateUpdatesInRange must hold lock!");
        boolean singleDelta = endDelta - startDelta == 1;
        BarrageMessage downstream = new BarrageMessage();
        downstream.firstSeq = startDelta < 0 ? -1L : this.pendingDeltas.get((int)startDelta).step;
        long l = downstream.lastSeq = endDelta < 1 ? -1L : this.pendingDeltas.get((int)(endDelta - 1)).step;
        if (this.isBlinkTable) {
            long size = 0L;
            RowSetBuilderSequential recordedBuilder = RowSetFactory.builderSequential();
            for (int ii = startDelta; ii < endDelta; ++ii) {
                Delta delta = this.pendingDeltas.get(ii);
                try (WritableRowSet positions = delta.update.added().invert((RowSet)delta.recordedAdds);){
                    positions.shiftInPlace(size);
                    recordedBuilder.appendRowSequence((RowSequence)positions);
                }
                size += delta.update.added().size();
            }
            TableUpdateImpl update = new TableUpdateImpl((RowSet)RowSetFactory.flat((long)size), (RowSet)RowSetFactory.flat((long)this.lastBlinkTableUpdateSize), (RowSet)RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY);
            boolean hasDelta = startDelta < endDelta;
            Delta origDelta = hasDelta ? this.pendingDeltas.get(startDelta) : null;
            firstDelta = new Delta(-1L, hasDelta ? origDelta.deltaColumnOffset : 0L, (TableUpdate)update, recordedBuilder.build(), (RowSet)RowSetFactory.empty(), hasDelta ? origDelta.subscribedColumns : new BitSet(), new BitSet());
            this.lastBlinkTableUpdateSize = size;
        } else {
            firstDelta = this.pendingDeltas.get(startDelta);
        }
        if (singleDelta || this.isBlinkTable) {
            ChunkSource.FillContext fc;
            WritableChunk chunk;
            int chunkCapacity;
            RowSequence rs;
            RowSequence.Iterator it;
            WritableColumnSource<?> deltaColumn;
            int ci;
            WritableRowSet localAdded = firstDelta.recordedAdds.isEmpty() ? RowSetFactory.empty() : RowSetFactory.fromRange((long)firstDelta.deltaColumnOffset, (long)(firstDelta.deltaColumnOffset + firstDelta.recordedAdds.size() - 1L));
            WritableRowSet localModified = firstDelta.recordedMods.isEmpty() ? RowSetFactory.empty() : RowSetFactory.fromRange((long)(firstDelta.deltaColumnOffset + firstDelta.recordedAdds.size()), (long)(firstDelta.deltaColumnOffset + firstDelta.recordedAdds.size() + firstDelta.recordedMods.size() - 1L));
            BitSet addColumnSet = firstDelta.recordedAdds.isEmpty() ? new BitSet() : firstDelta.subscribedColumns;
            BitSet modColumnSet = firstDelta.modifiedColumns;
            downstream.rowsAdded = firstDelta.update.added().copy();
            downstream.rowsRemoved = firstDelta.update.removed().copy();
            downstream.shifted = firstDelta.update.shifted();
            downstream.rowsIncluded = firstDelta.recordedAdds.copy();
            downstream.addColumnData = new BarrageMessage.AddColumnData[this.chunkSources.length];
            downstream.modColumnData = new BarrageMessage.ModColumnData[this.chunkSources.length];
            for (ci = 0; ci < downstream.addColumnData.length; ++ci) {
                deltaColumn = this.deltaColumns[ci];
                BarrageMessage.AddColumnData adds = new BarrageMessage.AddColumnData();
                adds.data = new ArrayList();
                adds.chunkType = deltaColumn.getChunkType();
                downstream.addColumnData[ci] = adds;
                if (addColumnSet.get(ci)) {
                    it = localAdded.getRowSequenceIterator();
                    try {
                        while (it.hasMore()) {
                            rs = it.getNextRowSequenceWithLength((long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE);
                            chunkCapacity = rs.intSize("serializeItems");
                            chunk = adds.chunkType.makeWritableChunk(chunkCapacity);
                            fc = deltaColumn.makeFillContext(chunkCapacity);
                            try {
                                deltaColumn.fillChunk(fc, chunk, rs);
                            }
                            finally {
                                if (fc != null) {
                                    fc.close();
                                }
                            }
                            adds.data.add(chunk);
                        }
                    }
                    finally {
                        if (it != null) {
                            it.close();
                        }
                    }
                }
                adds.type = this.realColumnType[ci];
                adds.componentType = this.realColumnComponentType[ci];
            }
            for (ci = 0; ci < downstream.modColumnData.length; ++ci) {
                deltaColumn = this.deltaColumns[ci];
                BarrageMessage.ModColumnData mods = new BarrageMessage.ModColumnData();
                mods.data = new ArrayList();
                mods.chunkType = deltaColumn.getChunkType();
                downstream.modColumnData[ci] = mods;
                if (modColumnSet.get(ci)) {
                    mods.rowsModified = firstDelta.recordedMods.copy();
                    it = localModified.getRowSequenceIterator();
                    try {
                        while (it.hasMore()) {
                            rs = it.getNextRowSequenceWithLength((long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE);
                            chunkCapacity = rs.intSize("serializeItems");
                            chunk = mods.chunkType.makeWritableChunk(chunkCapacity);
                            fc = deltaColumn.makeFillContext(chunkCapacity);
                            try {
                                deltaColumn.fillChunk(fc, chunk, rs);
                            }
                            finally {
                                if (fc != null) {
                                    fc.close();
                                }
                            }
                            mods.data.add(chunk);
                        }
                    }
                    finally {
                        if (it != null) {
                            it.close();
                        }
                    }
                }
                mods.rowsModified = RowSetFactory.empty();
                mods.type = this.realColumnType[ci];
                mods.componentType = this.realColumnComponentType[ci];
            }
        } else {
            UpdateCoalescer coalescer = new UpdateCoalescer((RowSet)this.propagationRowSet, firstDelta.update);
            for (int i = startDelta + 1; i < endDelta; ++i) {
                coalescer.update(this.pendingDeltas.get((int)i).update);
            }
            BitSet addColumnSet = new BitSet();
            BitSet modColumnSet = new BitSet();
            WritableRowSet localAdded = RowSetFactory.empty();
            for (int i = startDelta; i < endDelta; ++i) {
                Delta delta = this.pendingDeltas.get(i);
                localAdded.remove(delta.update.removed());
                delta.update.shifted().apply(localAdded);
                if (localAdded.isEmpty()) {
                    addColumnSet.clear();
                }
                if (delta.recordedAdds.isNonempty()) {
                    if (addColumnSet.isEmpty()) {
                        addColumnSet.or(delta.subscribedColumns);
                    } else {
                        Assert.equals((Object)delta.subscribedColumns, (String)"delta.subscribedColumns", (Object)addColumnSet, (String)"addColumnSet");
                    }
                    localAdded.insert((RowSet)delta.recordedAdds);
                }
                if (!delta.recordedMods.isNonempty()) continue;
                modColumnSet.or(delta.modifiedColumns);
            }
            HashMap infoCache = new HashMap();
            final class ColumnInfo {
                final WritableRowSet modified = RowSetFactory.empty();
                final WritableRowSet recordedMods = RowSetFactory.empty();
                long[][] addedMappings;
                long[][] modifiedMappings;

                ColumnInfo() {
                }
            }
            IntFunction<ColumnInfo> getColumnInfo = columnIndex -> {
                BitSet deltasThatModifyThisColumn = new BitSet();
                for (int i = startDelta; i < endDelta; ++i) {
                    if (!this.pendingDeltas.get((int)i).modifiedColumns.get(columnIndex)) continue;
                    deltasThatModifyThisColumn.set(i);
                }
                ColumnInfo ci = (ColumnInfo)infoCache.get(deltasThatModifyThisColumn);
                if (ci != null) {
                    return ci;
                }
                ColumnInfo retval = new ColumnInfo();
                for (int i = startDelta; i < endDelta; ++i) {
                    Delta delta = this.pendingDeltas.get(i);
                    retval.modified.remove(delta.update.removed());
                    retval.recordedMods.remove(delta.update.removed());
                    delta.update.shifted().apply(retval.modified);
                    delta.update.shifted().apply(retval.recordedMods);
                    if (!deltasThatModifyThisColumn.get(i)) continue;
                    retval.modified.insert(delta.update.modified());
                    retval.recordedMods.insert(delta.recordedMods);
                }
                retval.modified.remove((RowSet)coalescer.added);
                retval.recordedMods.remove((RowSet)coalescer.added);
                retval.addedMappings = BarrageMessageProducer.newMappingArray(localAdded.size());
                retval.modifiedMappings = BarrageMessageProducer.newMappingArray(retval.recordedMods.size());
                WritableRowSet unfilledAdds = localAdded.isEmpty() ? RowSetFactory.empty() : RowSetFactory.flat((long)localAdded.size());
                WritableRowSet unfilledMods = retval.recordedMods.isEmpty() ? RowSetFactory.empty() : RowSetFactory.flat((long)retval.recordedMods.size());
                WritableRowSet addedRemaining = localAdded.copy();
                WritableRowSet modifiedRemaining = retval.recordedMods.copy();
                for (int i = endDelta - 1; !(i < startDelta || addedRemaining.isEmpty() && modifiedRemaining.isEmpty()); --i) {
                    Delta delta = this.pendingDeltas.get(i);
                    BiConsumer<Boolean, Boolean> applyMapping = (addedMapping, recordedAdds) -> {
                        WritableRowSet remaining = addedMapping != false ? addedRemaining : modifiedRemaining;
                        WritableRowSet deltaRecorded = recordedAdds != false ? delta.recordedAdds : delta.recordedMods;
                        try (WritableRowSet recorded = remaining.intersect((RowSet)deltaRecorded);
                             WritableRowSet sourceRows = deltaRecorded.invert((RowSet)recorded);
                             WritableRowSet destinationsInPosSpace = remaining.invert((RowSet)recorded);
                             WritableRowSet rowsToFill = (addedMapping != false ? unfilledAdds : unfilledMods).subSetForPositions((RowSequence)destinationsInPosSpace);){
                            sourceRows.shiftInPlace(delta.deltaColumnOffset + (recordedAdds != false ? 0L : delta.recordedAdds.size()));
                            remaining.remove((RowSet)recorded);
                            if (addedMapping.booleanValue()) {
                                unfilledAdds.remove((RowSet)rowsToFill);
                            } else {
                                unfilledMods.remove((RowSet)rowsToFill);
                            }
                            BarrageMessageProducer.applyRedirMapping((RowSet)rowsToFill, (RowSet)sourceRows, addedMapping != false ? retval.addedMappings : retval.modifiedMappings);
                        }
                    };
                    applyMapping.accept(true, true);
                    applyMapping.accept(false, true);
                    if (deltasThatModifyThisColumn.get(i)) {
                        applyMapping.accept(true, false);
                        applyMapping.accept(false, false);
                    }
                    delta.update.shifted().unapply(addedRemaining);
                    delta.update.shifted().unapply(modifiedRemaining);
                }
                if (!unfilledAdds.isEmpty()) {
                    Assert.assertion((boolean)false, (String)("Error: added:" + String.valueOf(coalescer.added) + " unfilled:" + String.valueOf(unfilledAdds) + " missing:" + String.valueOf(coalescer.added.subSetForPositions((RowSequence)unfilledAdds))));
                }
                Assert.eq((long)unfilledAdds.size(), (String)"unfilledAdds.size()", (long)0L);
                Assert.eq((long)unfilledMods.size(), (String)"unfilledMods.size()", (long)0L);
                infoCache.put(deltasThatModifyThisColumn, retval);
                return retval;
            };
            if (coalescer.modifiedColumnSet == ModifiedColumnSet.ALL) {
                modColumnSet.set(0, this.deltaColumns.length);
            } else {
                modColumnSet.or(coalescer.modifiedColumnSet.extractAsBitSet());
            }
            downstream.rowsAdded = coalescer.added;
            downstream.rowsRemoved = coalescer.removed;
            downstream.shifted = coalescer.shifted;
            downstream.rowsIncluded = localAdded;
            downstream.addColumnData = new BarrageMessage.AddColumnData[this.chunkSources.length];
            downstream.modColumnData = new BarrageMessage.ModColumnData[this.chunkSources.length];
            for (int ci = 0; ci < downstream.addColumnData.length; ++ci) {
                WritableColumnSource<?> deltaColumn = this.deltaColumns[ci];
                BarrageMessage.AddColumnData adds = new BarrageMessage.AddColumnData();
                adds.data = new ArrayList();
                adds.chunkType = deltaColumn.getChunkType();
                downstream.addColumnData[ci] = adds;
                if (addColumnSet.get(ci)) {
                    ColumnInfo info = getColumnInfo.apply(ci);
                    for (long[] addedMapping : info.addedMappings) {
                        WritableChunk chunk = adds.chunkType.makeWritableChunk(addedMapping.length);
                        try (ChunkSource.FillContext fc = deltaColumn.makeFillContext(addedMapping.length);){
                            ((FillUnordered)deltaColumn).fillChunkUnordered(fc, chunk, LongChunk.chunkWrap((long[])addedMapping));
                        }
                        adds.data.add(chunk);
                    }
                }
                adds.type = this.realColumnType[ci];
                adds.componentType = this.realColumnComponentType[ci];
            }
            int numActualModCols = 0;
            for (int ci = 0; ci < downstream.modColumnData.length; ++ci) {
                WritableColumnSource<?> sourceColumn = this.deltaColumns[ci];
                BarrageMessage.ModColumnData mods = new BarrageMessage.ModColumnData();
                mods.data = new ArrayList();
                mods.chunkType = sourceColumn.getChunkType();
                downstream.modColumnData[numActualModCols++] = mods;
                if (modColumnSet.get(ci)) {
                    ColumnInfo info = getColumnInfo.apply(ci);
                    mods.rowsModified = info.recordedMods.copy();
                    for (long[] modifiedMapping : info.modifiedMappings) {
                        WritableChunk chunk = mods.chunkType.makeWritableChunk(modifiedMapping.length);
                        try (ChunkSource.FillContext fc = sourceColumn.makeFillContext(modifiedMapping.length);){
                            ((FillUnordered)sourceColumn).fillChunkUnordered(fc, chunk, LongChunk.chunkWrap((long[])modifiedMapping));
                        }
                        mods.data.add(chunk);
                    }
                } else {
                    mods.rowsModified = RowSetFactory.empty();
                }
                mods.type = this.realColumnType[ci];
                mods.componentType = this.realColumnComponentType[ci];
            }
        }
        this.propagationRowSet.remove(downstream.rowsRemoved);
        downstream.shifted.apply(this.propagationRowSet);
        this.propagationRowSet.insert(downstream.rowsAdded);
        downstream.tableSize = this.propagationRowSet.size();
        return downstream;
    }

    private static long[][] newMappingArray(long size) {
        int numAddChunks = LongSizedDataStructure.intSize((String)"BarrageMessageProducer", (long)((size + (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE - 1L) / (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE));
        long[][] result = new long[numAddChunks][];
        for (int ii = 0; ii < numAddChunks; ++ii) {
            int chunkSize = ii < numAddChunks - 1 || size % (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE == 0L ? ConstructSnapshot.SNAPSHOT_CHUNK_SIZE : (int)(size % (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE);
            long[] newChunk = new long[chunkSize];
            result[ii] = newChunk;
            Arrays.fill(newChunk, -1L);
        }
        return result;
    }

    private static void applyRedirMapping(RowSet keys, RowSet values, long[][] mapping) {
        Assert.eq((long)keys.size(), (String)"keys.size()", (long)values.size(), (String)"values.size()");
        RowSet.Iterator vit = values.iterator();
        keys.forAllRowKeys(lkey -> {
            int arrIdx = (int)(lkey / (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE);
            int keyIdx = (int)(lkey % (long)ConstructSnapshot.SNAPSHOT_CHUNK_SIZE);
            long[] chunk = mapping[arrIdx];
            Assert.eq((long)chunk[keyIdx], (String)"chunk[keyIdx]", (long)-1L, (String)"RowSet.NULL_ROW_KEY");
            chunk[keyIdx] = vit.nextLong();
        });
    }

    private void flipSnapshotStateForSubscriptions(List<Subscription> subscriptions) {
        for (Subscription subscription : subscriptions) {
            RowSet tmpViewport = subscription.viewport;
            subscription.viewport = subscription.snapshotViewport;
            subscription.snapshotViewport = (WritableRowSet)tmpViewport;
            boolean tmpDirection = subscription.reverseViewport;
            subscription.reverseViewport = subscription.snapshotReverseViewport;
            subscription.snapshotReverseViewport = tmpDirection;
            BitSet tmpColumns = subscription.subscribedColumns;
            subscription.subscribedColumns = subscription.snapshotColumns;
            subscription.snapshotColumns = tmpColumns;
        }
    }

    private void finalizeSnapshotForSubscriptions(List<Subscription> subscriptions) {
        boolean rebuildViewport = false;
        for (Subscription subscription : subscriptions) {
            boolean isComplete;
            boolean bl = isComplete = subscription.growingRemainingViewport.isEmpty() || subscription.growingRemainingViewport.firstRowKey() >= this.parentTableSize || this.isBlinkTable;
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)this.logPrefix).append((CharSequence)subscription.logPrefix).append((CharSequence)"finalizing snapshot isComplete=").append(isComplete).endl();
            }
            if (!isComplete) continue;
            subscription.isGrowingViewport = false;
            --this.numGrowingSubscriptions;
            if (subscription.viewport != null) {
                subscription.viewport.close();
            }
            subscription.viewport = subscription.targetViewport;
            subscription.targetViewport = null;
            if (subscription.viewport == null) {
                ++this.numFullSubscriptions;
            }
            subscription.growingRemainingViewport.close();
            subscription.growingRemainingViewport = null;
            rebuildViewport = true;
        }
        if (rebuildViewport) {
            this.buildPostSnapshotViewports(false);
        }
    }

    private void buildPostSnapshotViewports(boolean ignorePending) {
        RowSetBuilderRandom postSnapshotViewportBuilder = RowSetFactory.builderRandom();
        RowSetBuilderRandom postSnapshotReverseViewportBuilder = RowSetFactory.builderRandom();
        this.postSnapshotColumns.clear();
        for (Subscription sub : this.activeSubscriptions) {
            if (ignorePending && sub.hasPendingUpdate) continue;
            this.postSnapshotColumns.or(sub.subscribedColumns);
            if (!sub.isViewport()) continue;
            if (sub.reverseViewport) {
                postSnapshotReverseViewportBuilder.addRowSet(sub.viewport);
                continue;
            }
            postSnapshotViewportBuilder.addRowSet(sub.viewport);
        }
        if (this.postSnapshotViewport != null) {
            this.postSnapshotViewport.close();
        }
        if (this.postSnapshotReverseViewport != null) {
            this.postSnapshotReverseViewport.close();
        }
        this.postSnapshotViewport = postSnapshotViewportBuilder.build();
        this.postSnapshotReverseViewport = postSnapshotReverseViewportBuilder.build();
    }

    private void promoteSnapshotToActive() {
        Assert.assertion((boolean)Thread.holdsLock((Object)this), (String)"promoteSnapshotToActive must hold lock!");
        if (this.activeViewport != null) {
            this.activeViewport.close();
        }
        if (this.activeReverseViewport != null) {
            this.activeReverseViewport.close();
        }
        this.activeViewport = this.postSnapshotViewport == null || this.postSnapshotViewport.isEmpty() ? null : this.postSnapshotViewport;
        WritableRowSet writableRowSet = this.activeReverseViewport = this.postSnapshotReverseViewport == null || this.postSnapshotReverseViewport.isEmpty() ? null : this.postSnapshotReverseViewport;
        if (this.postSnapshotViewport != null && this.postSnapshotViewport.isEmpty()) {
            this.postSnapshotViewport.close();
        }
        this.postSnapshotViewport = null;
        if (this.postSnapshotReverseViewport != null && this.postSnapshotReverseViewport.isEmpty()) {
            this.postSnapshotReverseViewport.close();
        }
        this.postSnapshotReverseViewport = null;
        this.objectColumnsToClear.or(this.postSnapshotColumns);
        this.objectColumnsToClear.and(this.objectColumns);
        this.activeColumns.clear();
        this.activeColumns.or(this.postSnapshotColumns);
        this.postSnapshotColumns.clear();
    }

    private synchronized long getLastUpdateClockStep() {
        return this.lastUpdateClockStep;
    }

    @VisibleForTesting
    BarrageMessage getSnapshot(List<Subscription> snapshotSubscriptions, BitSet columnsToSnapshot, RowSet positionsToSnapshot, RowSet reversePositionsToSnapshot) {
        if (this.onGetSnapshot != null && this.onGetSnapshotIsPreSnap) {
            this.onGetSnapshot.run();
        }
        SnapshotControl snapshotControl = new SnapshotControl(snapshotSubscriptions);
        BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace((Object)((Object)this), this.parent, (BitSet)columnsToSnapshot, (RowSequence)positionsToSnapshot, (RowSequence)reversePositionsToSnapshot, (ConstructSnapshot.SnapshotControl)snapshotControl);
        if (this.onGetSnapshot != null && !this.onGetSnapshotIsPreSnap) {
            this.onGetSnapshot.run();
        }
        return msg;
    }

    @OverridingMethodsMustInvokeSuper
    protected synchronized void destroy() {
        super.destroy();
        if (this.stats != null) {
            this.stats.stop();
        }
    }

    private void recordWriteMetrics(long bytes, long cpuNanos) {
        this.recordMetric(stats -> stats.writeBits, bytes * 8L);
        this.recordMetric(stats -> stats.writeTime, cpuNanos);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recordMetric(Function<Stats, Histogram> hist, long value) {
        if (this.stats == null) {
            return;
        }
        Stats stats = this.stats;
        synchronized (stats) {
            hist.apply(this.stats).recordValue(value);
        }
    }

    public boolean isRefreshing() {
        return this.parent.isRefreshing();
    }

    public boolean setRefreshing(boolean refreshing) {
        if (this.parent.isRefreshing() || !refreshing) {
            return this.parent.isRefreshing();
        }
        throw new UnsupportedOperationException("cannot modify the source table's refreshing state");
    }

    public void addParentReference(Object parent) {
        if (DynamicNode.notDynamicOrIsRefreshing((Object)parent)) {
            this.setRefreshing(true);
            this.parents.add(parent);
            if (parent instanceof LivenessReferent) {
                this.manage((LivenessReferent)parent);
            }
            if (parent instanceof NotificationQueue.Dependency) {
                this.parent.getUpdateGraph(new NotificationQueue.Dependency[]{(NotificationQueue.Dependency)parent});
            }
        }
    }

    public synchronized void setLastNotificationStep(long lastNotificationStep) {
        this.lastUpdateClockStep = Math.max(lastNotificationStep, this.lastUpdateClockStep);
    }

    private static /* synthetic */ void lambda$enqueueUpdate$6(Subscription sub, TrackingRowSet rowSet, ShiftInversionHelper inverter, RowSet prevRowSet, RowSetBuilderRandom scopedViewBuilder, long posStart, long posEnd) {
        long prevKeyEnd;
        long prevEnd;
        long prevStart;
        long currKeyStart;
        long currKeyEnd;
        long localEnd;
        long localStart;
        if (sub.reverseViewport) {
            long lastRowPosition = rowSet.size() - 1L;
            localStart = Math.max(lastRowPosition - posEnd, 0L);
            localEnd = lastRowPosition - posStart;
            if (localEnd < 0L) {
                return;
            }
        } else {
            localStart = posStart;
            localEnd = posEnd;
        }
        if (sub.reverseViewport) {
            currKeyEnd = inverter.mapToPrevKeyspace(rowSet.get(Math.min(localEnd, rowSet.size() - 1L)), true);
            currKeyStart = inverter.mapToPrevKeyspace(rowSet.get(Math.min(localStart, rowSet.size() - 1L)), false);
        } else {
            currKeyStart = inverter.mapToPrevKeyspace(rowSet.get(Math.min(localStart, rowSet.size() - 1L)), false);
            currKeyEnd = inverter.mapToPrevKeyspace(rowSet.get(Math.min(localEnd, rowSet.size() - 1L)), true);
        }
        if (currKeyEnd < currKeyStart) {
            return;
        }
        if (sub.reverseViewport) {
            long lastPrevRowPosition = prevRowSet.size() - 1L;
            prevStart = Math.max(lastPrevRowPosition - posEnd, 0L);
            prevEnd = lastPrevRowPosition - posStart;
        } else {
            prevStart = localStart;
            prevEnd = localEnd;
        }
        long prevKeyStart = prevStart >= prevRowSet.size() ? prevRowSet.lastRowKey() + 1L : prevRowSet.get(prevStart);
        long l = prevKeyEnd = prevEnd < 0L ? -1L : prevRowSet.get(Math.min(prevEnd, prevRowSet.size() - 1L));
        if (currKeyStart < prevKeyStart) {
            scopedViewBuilder.addRange(currKeyStart, Math.min(prevKeyStart - 1L, currKeyEnd));
        }
        if (currKeyEnd > prevKeyEnd) {
            scopedViewBuilder.addRange(Math.max(prevKeyEnd + 1L, currKeyStart), currKeyEnd);
        }
    }

    private class UpdatePropagationJob
    implements Runnable {
        private final ReentrantLock runLock = new ReentrantLock();
        private final AtomicBoolean needsRun = new AtomicBoolean();

        private UpdatePropagationJob() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.needsRun.set(true);
            do {
                if (!this.runLock.tryLock()) {
                    return;
                }
                try {
                    if (!this.needsRun.compareAndSet(true, false)) continue;
                    long startTm = System.nanoTime();
                    BarrageMessageProducer.this.updateSubscriptionsSnapshotAndPropagate();
                    BarrageMessageProducer.this.recordMetric(stats -> stats.updateJob, System.nanoTime() - startTm);
                }
                catch (Exception exception) {
                    BarrageMessageProducer barrageMessageProducer = BarrageMessageProducer.this;
                    synchronized (barrageMessageProducer) {
                        StatusRuntimeException apiError = BarrageMessageProducer.this.errorTransformer.transform(exception);
                        Stream.concat(BarrageMessageProducer.this.activeSubscriptions.stream(), BarrageMessageProducer.this.pendingSubscriptions.stream()).distinct().forEach(sub -> GrpcUtil.safelyError(sub.listener, (StatusRuntimeException)apiError));
                        BarrageMessageProducer.this.activeSubscriptions.clear();
                        BarrageMessageProducer.this.pendingSubscriptions.clear();
                    }
                }
                finally {
                    this.runLock.unlock();
                }
            } while (this.needsRun.get());
        }

        public void scheduleImmediately() {
            if (this.needsRun.compareAndSet(false, true) && !this.runLock.isLocked()) {
                BarrageMessageProducer.this.scheduler.runImmediately(this);
            }
        }

        public void scheduleAt(long nextRunTimeMillis) {
            BarrageMessageProducer.this.scheduler.runAtTime(nextRunTimeMillis, this);
        }
    }

    private class Stats
    implements Runnable {
        private final int NUM_SIG_FIGS = 3;
        public final String tableId;
        public final String tableKey;
        public final Histogram enqueue;
        public final Histogram aggregate;
        public final Histogram propagate;
        public final Histogram snapshot;
        public final Histogram updateJob;
        public final Histogram writeTime;
        public final Histogram writeBits;
        private volatile boolean running;

        public Stats(String tableKey) {
            this.tableId = Integer.toHexString(System.identityHashCode(BarrageMessageProducer.this.parent));
            this.enqueue = new Histogram(3);
            this.aggregate = new Histogram(3);
            this.propagate = new Histogram(3);
            this.snapshot = new Histogram(3);
            this.updateJob = new Histogram(3);
            this.writeTime = new Histogram(3);
            this.writeBits = new Histogram(3);
            this.running = true;
            this.tableKey = tableKey;
            BarrageMessageProducer.this.scheduler.runAfterDelay(BarragePerformanceLog.CYCLE_DURATION_MILLIS, this);
        }

        public void stop() {
            this.running = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void run() {
            BarrageSubscriptionPerformanceLogger logger;
            if (!this.running) {
                return;
            }
            Instant now = BarrageMessageProducer.this.scheduler.instantMillis();
            BarrageMessageProducer.this.scheduler.runAfterDelay(BarragePerformanceLog.CYCLE_DURATION_MILLIS, this);
            BarrageSubscriptionPerformanceLogger barrageSubscriptionPerformanceLogger = logger = BarragePerformanceLog.getInstance().getSubscriptionLogger();
            synchronized (barrageSubscriptionPerformanceLogger) {
                this.flush(now, logger, this.enqueue, "EnqueueMillis");
                this.flush(now, logger, this.aggregate, "AggregateMillis");
                this.flush(now, logger, this.propagate, "PropagateMillis");
                this.flush(now, logger, this.snapshot, "SnapshotMillis");
                this.flush(now, logger, this.updateJob, "UpdateJobMillis");
                this.flush(now, logger, this.writeTime, "WriteMillis");
                this.flush(now, logger, this.writeBits, "WriteMegabits");
            }
        }

        private void flush(Instant now, BarrageSubscriptionPerformanceLogger logger, Histogram hist, String statType) {
            if (hist.getTotalCount() == 0L) {
                return;
            }
            logger.log(this.tableId, this.tableKey, statType, now, hist);
            hist.reset();
        }
    }

    private static class Subscription {
        private final BarrageSubscriptionOptions options;
        private final StreamObserver<BarrageMessageWriter.MessageView> listener;
        private final String logPrefix;
        private RowSet viewport;
        private BitSet subscribedColumns;
        private boolean reverseViewport;
        private boolean isActive = false;
        private boolean pendingDelete = false;
        private boolean hasPendingUpdate = false;
        private boolean pendingInitialSnapshot = true;
        private RowSet pendingViewport;
        private boolean pendingReverseViewport;
        private BitSet pendingColumns;
        private WritableRowSet snapshotViewport = null;
        private BitSet snapshotColumns = null;
        private boolean snapshotReverseViewport = false;
        private RowSet targetViewport = null;
        private BitSet targetColumns;
        private boolean targetReverseViewport;
        private boolean isGrowingViewport;
        private WritableRowSet growingRemainingViewport = null;
        private WritableRowSet growingIncrementalViewport = null;
        private boolean isFirstSnapshot;

        private Subscription(StreamObserver<BarrageMessageWriter.MessageView> listener, BarrageSubscriptionOptions options, BitSet subscribedColumns, @Nullable RowSet initialViewport, boolean reverseViewport) {
            this.options = options;
            this.listener = listener;
            this.logPrefix = "Sub{" + Integer.toHexString(System.identityHashCode(listener)) + "}: ";
            this.viewport = RowSetFactory.empty();
            this.subscribedColumns = new BitSet();
            this.pendingColumns = subscribedColumns;
            this.pendingViewport = initialViewport;
            this.pendingReverseViewport = this.reverseViewport = reverseViewport;
        }

        public boolean isViewport() {
            return this.viewport != null;
        }

        public boolean isFullSubscription() {
            return !this.isViewport() || this.hasPendingUpdate && this.pendingViewport == null || this.isGrowingViewport && this.targetViewport == null;
        }
    }

    private class DeltaListener
    extends InstrumentedTableUpdateListener {
        DeltaListener() {
            super("BarrageMessageProducer(" + BarrageMessageProducer.this.parent.getReferentDescription() + ")");
            Assert.assertion((boolean)BarrageMessageProducer.this.parentIsRefreshing, (String)"parent.isRefreshing()");
            this.manage((LivenessReferent)BarrageMessageProducer.this.parent);
            BarrageMessageProducer.this.addParentReference((Object)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onUpdate(TableUpdate upstream) {
            BarrageMessageProducer barrageMessageProducer = BarrageMessageProducer.this;
            synchronized (barrageMessageProducer) {
                block13: {
                    try {
                        boolean shouldEnqueueDelta;
                        if (BarrageMessageProducer.this.lastUpdateClockStep >= BarrageMessageProducer.this.parent.getUpdateGraph().clock().currentStep()) {
                            throw new IllegalStateException(BarrageMessageProducer.this.logPrefix + "lastUpdateClockStep=" + BarrageMessageProducer.this.lastUpdateClockStep + " >= notification on " + BarrageMessageProducer.this.parent.getUpdateGraph().clock().currentStep());
                        }
                        boolean bl = shouldEnqueueDelta = !BarrageMessageProducer.this.activeSubscriptions.isEmpty();
                        if (shouldEnqueueDelta) {
                            long startTm = System.nanoTime();
                            BarrageMessageProducer.this.enqueueUpdate(upstream);
                            BarrageMessageProducer.this.recordMetric(stats -> stats.enqueue, System.nanoTime() - startTm);
                            BarrageMessageProducer.this.schedulePropagation();
                        }
                        BarrageMessageProducer.this.parentTableSize = BarrageMessageProducer.this.parent.size();
                        BarrageMessageProducer.this.lastUpdateClockStep = BarrageMessageProducer.this.parent.getUpdateGraph().clock().currentStep();
                        if (!log.isDebugEnabled()) break block13;
                        try (WritableRowSet prevRowSet = BarrageMessageProducer.this.parent.getRowSet().copyPrev();){
                            log.debug().append((CharSequence)BarrageMessageProducer.this.logPrefix).append((CharSequence)"lastUpdateClockStep=").append(BarrageMessageProducer.this.lastUpdateClockStep).append((CharSequence)", upstream=").append((LogOutputAppendable)upstream).append((CharSequence)", shouldEnqueueDelta=").append(shouldEnqueueDelta).append((CharSequence)", rowSet=").append((LogOutputAppendable)BarrageMessageProducer.this.parent.getRowSet()).append((CharSequence)", prevRowSet=").append((LogOutputAppendable)prevRowSet).endl();
                        }
                    }
                    catch (Exception err) {
                        this.forceReferenceCountToZero();
                        BarrageMessageProducer.this.pendingError = err;
                        BarrageMessageProducer.this.schedulePropagation();
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void onFailureInternal(Throwable originalException, TableListener.Entry sourceEntry) {
            BarrageMessageProducer barrageMessageProducer = BarrageMessageProducer.this;
            synchronized (barrageMessageProducer) {
                BarrageMessageProducer.this.pendingError = originalException;
                BarrageMessageProducer.this.schedulePropagation();
            }
        }

        @OverridingMethodsMustInvokeSuper
        public void destroy() {
            super.destroy();
            BarrageMessageProducer.this.parent.removeUpdateListener((TableUpdateListener)this);
        }
    }

    private static class FillDeltaContext
    implements SafeCloseable {
        final int columnIndex;
        final ChunkSource.WithPrev<Values> chunkSource;
        final WritableColumnSource<?> deltaColumn;
        final ChunkSource.GetContext sourceGetContext;
        final ChunkSink.FillFromContext deltaFillContext;

        public FillDeltaContext(int columnIndex, ChunkSource.WithPrev<Values> chunkSource, WritableColumnSource<?> deltaColumn, SharedContext sharedContext, int chunkSize) {
            this.columnIndex = columnIndex;
            this.chunkSource = chunkSource;
            this.deltaColumn = deltaColumn;
            this.sourceGetContext = chunkSource.makeGetContext(chunkSize, sharedContext);
            this.deltaFillContext = deltaColumn.makeFillFromContext(chunkSize);
        }

        public void doFillChunk(RowSequence srcKeys, RowSequence dstKeys) {
            this.deltaColumn.fillFromChunk(this.deltaFillContext, this.chunkSource.getChunk(this.sourceGetContext, srcKeys), dstKeys);
        }

        public void close() {
            this.sourceGetContext.close();
            this.deltaFillContext.close();
        }
    }

    private static final class Delta
    implements SafeCloseable {
        private final long step;
        private final long deltaColumnOffset;
        private final TableUpdate update;
        private final WritableRowSet recordedAdds;
        private final RowSet recordedMods;
        private final BitSet subscribedColumns;
        private final BitSet modifiedColumns;

        private Delta(long step, long deltaColumnOffset, TableUpdate update, WritableRowSet recordedAdds, RowSet recordedMods, BitSet subscribedColumns, BitSet modifiedColumns) {
            this.step = step;
            this.deltaColumnOffset = deltaColumnOffset;
            this.update = TableUpdateImpl.copy((TableUpdate)update);
            this.recordedAdds = recordedAdds;
            this.recordedMods = recordedMods;
            this.subscribedColumns = subscribedColumns;
            this.modifiedColumns = modifiedColumns;
        }

        public void close() {
            this.update.release();
            this.recordedAdds.close();
            this.recordedMods.close();
        }
    }

    private class SnapshotControl
    implements ConstructSnapshot.SnapshotControl {
        long capturedLastUpdateClockStep;
        long resultValidStep = -1L;
        final List<Subscription> snapshotSubscriptions;

        SnapshotControl(List<Subscription> snapshotSubscriptions) {
            this.snapshotSubscriptions = snapshotSubscriptions;
        }

        public Boolean usePreviousValues(long beforeClockValue) {
            if (!BarrageMessageProducer.this.parentIsRefreshing) {
                return false;
            }
            this.capturedLastUpdateClockStep = BarrageMessageProducer.this.getLastUpdateClockStep();
            LogicalClock.State beforeState = LogicalClock.getState((long)beforeClockValue);
            long beforeStep = LogicalClock.getStep((long)beforeClockValue);
            if (beforeState == LogicalClock.State.Idle) {
                this.resultValidStep = beforeStep;
                return false;
            }
            boolean notifiedOnThisStep = beforeStep == this.capturedLastUpdateClockStep;
            boolean usePrevious = !notifiedOnThisStep;
            long l = this.resultValidStep = notifiedOnThisStep ? beforeStep : beforeStep - 1L;
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)BarrageMessageProducer.this.logPrefix).append((CharSequence)"usePreviousValues: usePrevious=").append(usePrevious).append((CharSequence)", beforeStep=").append(beforeStep).append((CharSequence)", lastUpdateStep=").append(this.capturedLastUpdateClockStep).endl();
            }
            return usePrevious;
        }

        public boolean snapshotConsistent(long currentClockValue, boolean usingPreviousValues) {
            if (!BarrageMessageProducer.this.parentIsRefreshing) {
                return true;
            }
            return this.capturedLastUpdateClockStep == BarrageMessageProducer.this.getLastUpdateClockStep();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean snapshotCompletedConsistently(long afterClockValue, boolean usedPreviousValues) {
            boolean success;
            BarrageMessageProducer barrageMessageProducer = BarrageMessageProducer.this;
            synchronized (barrageMessageProducer) {
                success = this.snapshotConsistent(afterClockValue, usedPreviousValues);
                if (!success) {
                    this.resultValidStep = -1L;
                } else {
                    BarrageMessageProducer.this.flipSnapshotStateForSubscriptions(this.snapshotSubscriptions);
                    BarrageMessageProducer.this.finalizeSnapshotForSubscriptions(this.snapshotSubscriptions);
                    BarrageMessageProducer.this.promoteSnapshotToActive();
                    BarrageMessageProducer.this.blinkTableUpdateSize = 0L;
                }
            }
            if (log.isDebugEnabled()) {
                log.debug().append((CharSequence)BarrageMessageProducer.this.logPrefix).append((CharSequence)"success=").append(success).append((CharSequence)", validStep=").append(this.resultValidStep).append((CharSequence)", numSnapshotSubscriptions=").append(this.snapshotSubscriptions.size()).endl();
            }
            return success;
        }

        public UpdateGraph getUpdateGraph() {
            return BarrageMessageProducer.this.parent.isRefreshing() ? BarrageMessageProducer.this.parent.getUpdateGraph() : null;
        }
    }

    private static class MyMemoKey
    extends MemoizedOperationKey {
        private final long interval;

        private MyMemoKey(long interval) {
            this.interval = interval;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
                return false;
            }
            MyMemoKey that = (MyMemoKey)((Object)o);
            return this.interval == that.interval;
        }

        public int hashCode() {
            return Long.hashCode(this.interval);
        }
    }

    public static class Operation
    implements QueryTable.MemoizableOperation<BarrageMessageProducer> {
        private final Scheduler scheduler;
        private final SessionService.ErrorTransformer errorTransformer;
        private final BarrageMessageWriter.Factory streamGeneratorFactory;
        private final BaseTable<?> parent;
        private final long updateIntervalMs;
        private final Runnable onGetSnapshot;

        public Operation(Scheduler scheduler, SessionService.ErrorTransformer errorTransformer, BarrageMessageWriter.Factory streamGeneratorFactory, BaseTable<?> parent, long updateIntervalMs) {
            this(scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, null);
        }

        @VisibleForTesting
        public Operation(Scheduler scheduler, SessionService.ErrorTransformer errorTransformer, BarrageMessageWriter.Factory streamGeneratorFactory, BaseTable<?> parent, long updateIntervalMs, @Nullable Runnable onGetSnapshot) {
            this.scheduler = scheduler;
            this.errorTransformer = errorTransformer;
            this.streamGeneratorFactory = streamGeneratorFactory;
            this.parent = parent;
            this.updateIntervalMs = updateIntervalMs;
            this.onGetSnapshot = onGetSnapshot;
        }

        public String getDescription() {
            return "BarrageMessageProducer(" + this.updateIntervalMs + "," + System.identityHashCode(this.parent) + ")";
        }

        public String getLogPrefix() {
            return "BarrageMessageProducer.Operation(" + System.identityHashCode(this) + "): ";
        }

        public MemoizedOperationKey getMemoizedOperationKey() {
            return new MyMemoKey(this.updateIntervalMs);
        }

        public QueryTable.Operation.Result<BarrageMessageProducer> initialize(boolean usePrev, long beforeClock) {
            BarrageMessageProducer result = new BarrageMessageProducer(this.scheduler, this.errorTransformer, this.streamGeneratorFactory, this.parent, this.updateIntervalMs, this.onGetSnapshot);
            return new QueryTable.Operation.Result((DynamicNode)result, (TableUpdateListener)result.constructListener());
        }

        public static interface Factory {
            public Operation create(BaseTable<?> var1, long var2);
        }
    }
}

