/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.browserstreaming;

import com.google.rpc.Code;
import io.deephaven.base.RAPriQueue;
import io.deephaven.base.verify.Assert;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.browserstreaming.StreamData;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.util.GrpcServiceOverrideBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;

public class BrowserStream<T>
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(BrowserStream.class);
    private long nextSeq = 0L;
    private boolean processingMessage = false;
    private long halfClosedSeq = -1L;
    private final Mode mode;
    private final String logIdentity;
    private final SessionState session;
    private final Marshaller<T> marshaller;
    private RAPriQueue<Message<T>> pendingSeq;
    private StreamData queuedStreamData;
    private T queuedMessage;

    public static <ReqT, RespT> Factory<ReqT, RespT> factory(Mode mode, final GrpcServiceOverrideBuilder.BidiDelegate<ReqT, RespT> bidiDelegate) {
        return (session, responseObserver) -> new BrowserStream(mode, session, new Marshaller<ReqT>(){
            private final StreamObserver<ReqT> requestObserver;
            {
                this.requestObserver = bidiDelegate.doInvoke(responseObserver);
            }

            @Override
            public void onMessageReceived(ReqT message) {
                this.requestObserver.onNext(message);
            }

            @Override
            public void onCancel() {
                StatusRuntimeException canceled = Exceptions.statusRuntimeException((Code)Code.CANCELLED, (String)"Stream canceled on the server");
                GrpcUtil.safelyError((StreamObserver)responseObserver, (StatusRuntimeException)canceled);
                GrpcUtil.safelyError(this.requestObserver, (StatusRuntimeException)canceled);
            }

            @Override
            public void onError(Throwable err) {
                this.requestObserver.onError(err);
            }

            @Override
            public void onCompleted() {
                this.requestObserver.onCompleted();
            }
        });
    }

    private BrowserStream(Mode mode, SessionState session, Marshaller<T> marshaller) {
        this.mode = mode;
        this.logIdentity = "BrowserStream(" + Integer.toHexString(System.identityHashCode(this)) + "): ";
        this.session = session;
        this.marshaller = marshaller;
        this.session.addOnCloseCallback(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessageReceived(T message, StreamData streamData) {
        BrowserStream browserStream = this;
        synchronized (browserStream) {
            if (this.halfClosedSeq != -1L && (long)streamData.getSequence() > this.halfClosedSeq) {
                throw Exceptions.statusRuntimeException((Code)Code.ABORTED, (String)("Sequence sent after half close: closed seq=" + this.halfClosedSeq + " recv seq=" + streamData.getSequence()));
            }
            if (streamData.isHalfClose()) {
                if (this.halfClosedSeq != -1L) {
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)("Already half closed: closed seq=" + this.halfClosedSeq + " recv seq=" + streamData.getSequence()));
                }
                this.halfClosedSeq = streamData.getSequence();
            }
            if (this.mode == Mode.IN_ORDER) {
                if ((long)streamData.getSequence() < this.nextSeq) {
                    throw Exceptions.statusRuntimeException((Code)Code.OUT_OF_RANGE, (String)("Duplicate sequence sent: next seq=" + this.nextSeq + " recv seq=" + streamData.getSequence()));
                }
                boolean queueMsg = false;
                if (this.processingMessage) {
                    queueMsg = true;
                    log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"queueing; next seq=").append(this.nextSeq).append((CharSequence)" recv seq=").append(streamData.getSequence()).endl();
                } else if ((long)streamData.getSequence() != this.nextSeq) {
                    queueMsg = true;
                    log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"queueing; waiting seq=").append(this.nextSeq).append((CharSequence)" recv seq=").append(streamData.getSequence()).endl();
                }
                if (queueMsg) {
                    if (this.pendingSeq == null) {
                        this.pendingSeq = new RAPriQueue(1, MessageInfoQueueAdapter.getInstance(), Message.class);
                    }
                    this.pendingSeq.enter(new Message<T>(message, streamData));
                    return;
                }
            } else {
                if ((long)streamData.getSequence() < this.nextSeq || (long)streamData.getSequence() == this.nextSeq && this.processingMessage || this.queuedStreamData != null && streamData.getSequence() < this.queuedStreamData.getSequence()) {
                    log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"dropping; next seq=").append(this.nextSeq).append((CharSequence)" queued seq=").append(this.queuedStreamData != null ? this.queuedStreamData.getSequence() : -1).append((CharSequence)" recv seq=").append(streamData.getSequence()).endl();
                    return;
                }
                if (this.processingMessage) {
                    log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"queueing; processing seq=").append(this.nextSeq).append((CharSequence)" recv seq=").append(streamData.getSequence()).endl();
                    this.queuedStreamData = streamData;
                    this.queuedMessage = message;
                    return;
                }
            }
            this.nextSeq = streamData.getSequence() + 1;
            this.processingMessage = true;
        }
        while (true) {
            browserStream = this;
            synchronized (browserStream) {
                if (streamData.isHalfClose()) {
                    this.onComplete();
                    this.processingMessage = false;
                    return;
                }
            }
            try {
                this.marshaller.onMessageReceived(message);
            }
            catch (RuntimeException e) {
                this.onError(e);
                return;
            }
            browserStream = this;
            synchronized (browserStream) {
                if (this.mode == Mode.IN_ORDER) {
                    if (this.pendingSeq == null || this.pendingSeq.top() == null) {
                        message = null;
                        streamData = null;
                    } else {
                        Message top = (Message)this.pendingSeq.top();
                        message = top.getMessage();
                        streamData = top.getStreamData();
                    }
                    if (streamData == null || (long)streamData.getSequence() != this.nextSeq) {
                        this.processingMessage = false;
                        break;
                    }
                    Assert.eq(((Message)this.pendingSeq.removeTop()).getMessage(), (String)"pendingSeq.remoteTop()", message, (String)"message");
                } else {
                    message = this.queuedMessage;
                    streamData = this.queuedStreamData;
                    if (message == null) {
                        this.processingMessage = false;
                        break;
                    }
                    this.queuedStreamData = null;
                }
                log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"processing queued seq=").append(streamData.getSequence()).endl();
                this.nextSeq = streamData.getSequence() + 1;
            }
        }
    }

    public void onError(RuntimeException e) {
        if (this.session.removeOnCloseCallback(this)) {
            log.error().append((CharSequence)this.logIdentity).append((CharSequence)"closing browser stream on unexpected exception: ").append((Throwable)e).endl();
            this.marshaller.onError(e);
        }
    }

    @Override
    public void close() {
        this.marshaller.onCancel();
    }

    private void onComplete() {
        if (this.session.removeOnCloseCallback(this)) {
            log.debug().append((CharSequence)this.logIdentity).append((CharSequence)"browser stream completed").endl();
            this.marshaller.onCompleted();
        }
    }

    public static enum Mode {
        IN_ORDER,
        MOST_RECENT;

    }

    public static interface Factory<ReqT, RespT> {
        public BrowserStream<ReqT> create(SessionState var1, StreamObserver<RespT> var2);
    }

    private static interface Marshaller<T> {
        public void onMessageReceived(T var1);

        public void onCancel();

        public void onError(Throwable var1);

        public void onCompleted();
    }

    private static class MessageInfoQueueAdapter
    implements RAPriQueue.Adapter<Message<?>> {
        private static final MessageInfoQueueAdapter INSTANCE = new MessageInfoQueueAdapter();

        private MessageInfoQueueAdapter() {
        }

        private static <T extends Message<?>> RAPriQueue.Adapter<T> getInstance() {
            return INSTANCE;
        }

        public boolean less(Message<?> a, Message<?> b) {
            return a.getStreamData().getSequence() < b.getStreamData().getSequence();
        }

        public void setPos(Message<?> mi, int pos) {
            mi.pos = pos;
        }

        public int getPos(Message<?> mi) {
            return mi.pos;
        }
    }

    private static class Message<T> {
        private int pos;
        private final T message;
        private final StreamData streamData;

        public Message(T message, StreamData streamData) {
            this.message = message;
            this.streamData = streamData;
        }

        public T getMessage() {
            return this.message;
        }

        public StreamData getStreamData() {
            return this.streamData;
        }
    }
}

