/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.table;

import com.google.rpc.Code;
import dagger.assisted.Assisted;
import dagger.assisted.AssistedFactory;
import dagger.assisted.AssistedInject;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.SnapshotUnsuccessfulException;
import io.deephaven.engine.exceptions.TableAlreadyFailedException;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessStateException;
import io.deephaven.engine.liveness.ReferenceCountedLivenessReferent;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.NotificationStepReceiver;
import io.deephaven.engine.table.impl.OperationSnapshotControl;
import io.deephaven.engine.table.impl.UncoalescedTable;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.hash.KeyedLongObjectHashMap;
import io.deephaven.hash.KeyedLongObjectKey;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.ExportedTableUpdateMessage;
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.mutable.MutableLong;
import io.grpc.stub.StreamObserver;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import org.jetbrains.annotations.NotNull;

public class ExportedTableUpdateListener
implements StreamObserver<ExportNotification> {
    private static final Logger log = LoggerFactory.getLogger(ExportedTableUpdateListener.class);
    private final SessionState session;
    private final String logPrefix;
    private final StreamObserver<ExportedTableUpdateMessage> responseObserver;
    private final SessionService.ErrorTransformer errorTransformer;
    private final KeyedLongObjectHashMap<ListenerImpl> updateListenerMap = new KeyedLongObjectHashMap(EXPORT_KEY);
    private volatile boolean isDestroyed = false;
    private static final KeyedLongObjectKey<ListenerImpl> EXPORT_KEY = new KeyedLongObjectKey.BasicStrict<ListenerImpl>(){

        public long getLongKey(@NotNull ListenerImpl listener) {
            return listener.exportId;
        }
    };
    private static final NotificationStepReceiver NOOP_NOTIFICATION_STEP_RECEIVER = lastNotificationStep -> {};

    @AssistedInject
    public ExportedTableUpdateListener(@Assisted SessionState session, @Assisted StreamObserver<ExportedTableUpdateMessage> responseObserver, SessionService.ErrorTransformer errorTransformer) {
        this.session = session;
        this.logPrefix = "ExportedTableUpdateListener(" + Integer.toHexString(System.identityHashCode(this)) + ") ";
        this.responseObserver = responseObserver;
        this.errorTransformer = errorTransformer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ExportNotification notification) {
        block16: {
            if (this.isDestroyed) {
                throw Exceptions.statusRuntimeException((Code)Code.CANCELLED, (String)"client cancelled the stream");
            }
            Ticket ticket = notification.getTicket();
            int exportId = ExportTicketHelper.ticketToExportId((Ticket)ticket, (String)"ticket");
            if (exportId == 0) {
                return;
            }
            try {
                ListenerImpl listener;
                ExportNotification.State state = notification.getExportState();
                if (state == ExportNotification.State.EXPORTED) {
                    SessionState.ExportObject export = this.session.getExport(exportId);
                    if (!export.tryRetainReference()) break block16;
                    try {
                        Object obj = export.get();
                        if (!(obj instanceof BaseTable)) break block16;
                        try (SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(((NotificationQueue.Dependency)obj).getUpdateGraph()).open();){
                            this.onNewTableExport(ticket, exportId, (BaseTable)obj);
                            break block16;
                        }
                    }
                    finally {
                        export.dropReference();
                    }
                }
                if (SessionState.isExportStateTerminal(state) && (listener = (ListenerImpl)((Object)this.updateListenerMap.remove((long)exportId))) != null) {
                    listener.dropReference();
                }
            }
            catch (LivenessStateException state) {
            }
            catch (RuntimeException err) {
                log.error().append((CharSequence)this.logPrefix).append((CharSequence)"unexpected failure when processing export notification: ").append((Throwable)err).endl();
            }
        }
    }

    public void onError(Throwable t) {
        this.onCompleted();
    }

    public synchronized void onCompleted() {
        if (this.isDestroyed) {
            return;
        }
        this.isDestroyed = true;
        GrpcUtil.safelyComplete(this.responseObserver);
        this.updateListenerMap.forEach(ReferenceCountedLivenessReferent::dropReference);
        this.updateListenerMap.clear();
        log.info().append((CharSequence)this.logPrefix).append((CharSequence)"is complete").endl();
    }

    private synchronized void onNewTableExport(Ticket ticket, int exportId, BaseTable<?> table) {
        if (table instanceof UncoalescedTable) {
            return;
        }
        if (!table.isRefreshing()) {
            this.sendUpdateMessage(ticket, table.size(), null);
            return;
        }
        if (table.isFailed()) {
            this.sendUpdateMessage(ticket, -1L, (Throwable)Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)"Exported Table Already Failed"));
            return;
        }
        if (this.updateListenerMap.contains((Object)exportId)) {
            return;
        }
        OperationSnapshotControl snapshotControl = new OperationSnapshotControl(table);
        ListenerImpl listener = new ListenerImpl(table, exportId);
        listener.tryRetainReference();
        this.updateListenerMap.put((long)exportId, (Object)listener);
        try {
            MutableLong initSize = new MutableLong();
            BaseTable.initializeWithSnapshot((String)this.logPrefix, (ConstructSnapshot.SnapshotControl)snapshotControl, (usePrev, beforeClockValue) -> {
                snapshotControl.setListenerAndResult((TableUpdateListener)listener, NOOP_NOTIFICATION_STEP_RECEIVER);
                TrackingRowSet rowSet = table.getRowSet();
                initSize.set(usePrev ? rowSet.sizePrev() : rowSet.size());
                return true;
            });
            this.sendUpdateMessage(ticket, initSize.get(), null);
        }
        catch (SnapshotUnsuccessfulException err) {
            if (err.getCause() instanceof TableAlreadyFailedException) {
                this.sendUpdateMessage(ticket, -1L, (Throwable)Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)"Exported Table Already Failed"));
            }
            this.sendUpdateMessage(ticket, -1L, (Throwable)this.errorTransformer.transform(err));
        }
    }

    private synchronized void sendUpdateMessage(Ticket ticket, long size, Throwable error) {
        if (this.isDestroyed) {
            return;
        }
        ExportedTableUpdateMessage.Builder update = ExportedTableUpdateMessage.newBuilder().setExportId(ticket).setSize(size);
        if (error != null) {
            update.setUpdateFailureMessage(error.getMessage());
        }
        try {
            this.responseObserver.onNext((Object)update.build());
        }
        catch (RuntimeException err) {
            log.debug().append((CharSequence)this.logPrefix).append((CharSequence)"failed to notify listener of state change: ").append((Throwable)err).endl();
            this.session.removeExportListener(this);
        }
    }

    private class ListenerImpl
    extends InstrumentedTableUpdateListener {
        private final BaseTable<?> table;
        private final int exportId;

        private ListenerImpl(BaseTable<?> table, int exportId) {
            super("ExportedTableUpdateListener (" + exportId + ")");
            this.table = table;
            this.exportId = exportId;
            this.manage((LivenessReferent)table);
        }

        public void onUpdate(TableUpdate upstream) {
            ExportedTableUpdateListener.this.sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket((int)this.exportId), this.table.size(), null);
        }

        public void onFailureInternal(Throwable error, TableListener.Entry sourceEntry) {
            ExportedTableUpdateListener.this.sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket((int)this.exportId), this.table.size(), (Throwable)ExportedTableUpdateListener.this.errorTransformer.transform(error));
        }

        @OverridingMethodsMustInvokeSuper
        public void destroy() {
            super.destroy();
            this.table.removeUpdateListener((TableUpdateListener)this);
        }
    }

    @AssistedFactory
    public static interface Factory {
        public ExportedTableUpdateListener create(SessionState var1, StreamObserver<ExportedTableUpdateMessage> var2);
    }
}

