/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.server.object;

import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.rpc.Code;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.liveness.ReleasableLivenessManager;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.plugin.type.ObjectCommunicationException;
import io.deephaven.plugin.type.ObjectType;
import io.deephaven.plugin.type.ObjectTypeLookup;
import io.deephaven.proto.backplane.grpc.ClientData;
import io.deephaven.proto.backplane.grpc.FetchObjectRequest;
import io.deephaven.proto.backplane.grpc.FetchObjectResponse;
import io.deephaven.proto.backplane.grpc.ObjectServiceGrpc;
import io.deephaven.proto.backplane.grpc.ServerData;
import io.deephaven.proto.backplane.grpc.StreamRequest;
import io.deephaven.proto.backplane.grpc.StreamResponse;
import io.deephaven.proto.backplane.grpc.TypedTicket;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.grpc.GrpcErrorHelper;
import io.deephaven.server.object.TypeLookup;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingRunnable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.jetbrains.annotations.NotNull;

public class ObjectServiceGrpcImpl
extends ObjectServiceGrpc.ObjectServiceImplBase {
    private final SessionService sessionService;
    private final TicketRouter ticketRouter;
    private final ObjectTypeLookup objectTypeLookup;
    private final TypeLookup typeLookup;
    private final SessionService.ErrorTransformer errorTransformer;

    @Inject
    public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketRouter, ObjectTypeLookup objectTypeLookup, TypeLookup typeLookup, SessionService.ErrorTransformer errorTransformer) {
        this.sessionService = Objects.requireNonNull(sessionService);
        this.ticketRouter = Objects.requireNonNull(ticketRouter);
        this.objectTypeLookup = Objects.requireNonNull(objectTypeLookup);
        this.typeLookup = Objects.requireNonNull(typeLookup);
        this.errorTransformer = Objects.requireNonNull(errorTransformer);
    }

    public void fetchObject(@NotNull FetchObjectRequest request, final @NotNull StreamObserver<FetchObjectResponse> responseObserver) {
        SessionState session = this.sessionService.getCurrentSession();
        final String type = request.getSourceId().getType();
        if (type.isEmpty()) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"No type supplied");
        }
        if (request.getSourceId().getTicket().getTicket().isEmpty()) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"No ticket supplied");
        }
        String description = "ObjectService#fetchObject(object=" + this.ticketRouter.getLogNameFor(request.getSourceId().getTicket(), "sourceId") + ")";
        QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery((String)description, (String)session.getSessionId(), (QueryPerformanceNugget.Factory)QueryPerformanceNugget.DEFAULT_FACTORY);
        try (SafeCloseable ignored = queryPerformanceRecorder.startQuery();){
            SessionState.ExportObject object = this.ticketRouter.resolve(session, request.getSourceId().getTicket(), "sourceId");
            session.nonExport().queryPerformanceRecorder(queryPerformanceRecorder).require(object).onError(responseObserver).onSuccess(response -> GrpcUtil.safelyOnNextAndComplete((StreamObserver)responseObserver, (Object)response)).submit(() -> {
                Object o = object.get();
                ObjectType objectTypeInstance = this.getObjectTypeInstance(type, o);
                final AtomicReference singleResponse = new AtomicReference();
                final AtomicBoolean isClosed = new AtomicBoolean(false);
                StreamObserver<StreamResponse> wrappedResponseObserver = new StreamObserver<StreamResponse>(){

                    public void onNext(StreamResponse value) {
                        singleResponse.set(FetchObjectResponse.newBuilder().setType(type).setData(value.getData().getPayload()).addAllTypedExportIds((Iterable)value.getData().getExportedReferencesList()).build());
                    }

                    public void onError(Throwable t) {
                        responseObserver.onError(t);
                    }

                    public void onCompleted() {
                        isClosed.set(true);
                    }
                };
                PluginMessageSender connection = new PluginMessageSender(wrappedResponseObserver, session);
                objectTypeInstance.clientConnection(o, (ObjectType.MessageStream)connection);
                FetchObjectResponse message = (FetchObjectResponse)singleResponse.get();
                if (message == null) {
                    connection.onClose();
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Plugin didn't send a response before returning from clientConnection()");
                }
                if (!isClosed.get()) {
                    connection.onClose();
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Plugin didn't close response, use MessageStream instead for this object");
                }
                return message;
            });
        }
    }

    public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> responseObserver) {
        SessionState session = this.sessionService.getCurrentSession();
        return new SendMessageObserver(session, responseObserver);
    }

    @NotNull
    private ObjectType getObjectTypeInstance(String expectedType, Object object) {
        Optional o = this.objectTypeLookup.findObjectType(object);
        if (o.isEmpty()) {
            throw Exceptions.statusRuntimeException((Code)Code.NOT_FOUND, (String)String.format("No ObjectType found, expected type '%s'", expectedType));
        }
        ObjectType objectType = (ObjectType)o.get();
        if (!expectedType.equals(objectType.name())) {
            throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)String.format("Unexpected ObjectType, expected type '%s', actual type '%s'", expectedType, objectType.name()));
        }
        return objectType;
    }

    private static void cleanup(Collection<SessionState.ExportObject<?>> exports, Throwable t) {
        for (SessionState.ExportObject<?> export : exports) {
            try {
                export.release();
            }
            catch (Throwable inner) {
                t.addSuppressed(inner);
            }
        }
    }

    private final class SendMessageObserver
    implements StreamObserver<StreamRequest> {
        private final SessionState session;
        private final StreamObserver<StreamResponse> responseObserver;
        private boolean seenConnect = false;
        private ObjectType.MessageStream messageStream;
        private final Queue<EnqueuedStreamOperation> operations = new ConcurrentLinkedQueue<EnqueuedStreamOperation>();
        private final AtomicReference<EnqueuedState> runState = new AtomicReference<EnqueuedState>(EnqueuedState.WAITING);

        private SendMessageObserver(SessionState session, StreamObserver<StreamResponse> responseObserver) {
            this.session = session;
            this.responseObserver = responseObserver;
        }

        public void onNext(StreamRequest request) {
            GrpcErrorHelper.checkHasOneOf((Message)request, "message");
            if (request.hasConnect()) {
                if (this.seenConnect) {
                    throw Exceptions.statusRuntimeException((Code)Code.FAILED_PRECONDITION, (String)"Already sent a connect request, cannot send another");
                }
                this.seenConnect = true;
                TypedTicket typedTicket2 = request.getConnect().getSourceId();
                String type = typedTicket2.getType();
                if (type.isEmpty()) {
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"No type supplied");
                }
                if (typedTicket2.getTicket().getTicket().isEmpty()) {
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"No ticket supplied");
                }
                SessionState.ExportObject object = ObjectServiceGrpcImpl.this.ticketRouter.resolve(this.session, typedTicket2.getTicket(), "sourceId");
                this.runOrEnqueue(Collections.singleton(object), () -> {
                    Object o = object.get();
                    ObjectType objectType = ObjectServiceGrpcImpl.this.getObjectTypeInstance(type, o);
                    PluginMessageSender clientConnection = new PluginMessageSender(this.responseObserver, this.session);
                    this.messageStream = objectType.clientConnection(o, (ObjectType.MessageStream)clientConnection);
                });
            } else if (request.hasData()) {
                List referenceObjects;
                if (!this.seenConnect) {
                    throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Data message sent before Connect message");
                }
                ClientData data = request.getData();
                LivenessScope exportScope = new LivenessScope();
                try (SafeCloseable ignored = LivenessScopeStack.open((ReleasableLivenessManager)exportScope, (boolean)false);){
                    referenceObjects = data.getReferencesList().stream().map(typedTicket -> ObjectServiceGrpcImpl.this.ticketRouter.resolve(this.session, typedTicket.getTicket(), "ticket")).collect(Collectors.toList());
                }
                this.runOrEnqueue(referenceObjects, () -> {
                    Object[] objs;
                    try {
                        objs = referenceObjects.stream().map(SessionState.ExportObject::get).toArray();
                    }
                    finally {
                        exportScope.release();
                    }
                    this.messageStream.onData(data.getPayload().asReadOnlyByteBuffer(), objs);
                });
            }
        }

        private void runOrEnqueue(Collection<? extends SessionState.ExportObject<?>> dependencies, StreamOperation operation) {
            this.operations.add(new EnqueuedStreamOperation(dependencies, operation));
            this.doWork();
        }

        private void doWork() {
            EnqueuedStreamOperation next = this.operations.peek();
            if (next != null && this.runState.compareAndSet(EnqueuedState.WAITING, EnqueuedState.RUNNING)) {
                EnqueuedStreamOperation actualNext = this.operations.poll();
                Assert.eq((Object)next, (String)"next", (Object)actualNext, (String)"actualNext");
                next.run();
            }
        }

        public void onError(Throwable t) {
            this.runState.set(EnqueuedState.CLOSED);
            GrpcUtil.safelyError(this.responseObserver, (StatusRuntimeException)ObjectServiceGrpcImpl.this.errorTransformer.transform(t));
            if (this.messageStream != null) {
                this.closeMessageStream();
            }
            this.operations.clear();
        }

        private void closeMessageStream() {
            try {
                this.messageStream.onClose();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        public void onCompleted() {
            this.runOrEnqueue(Collections.emptyList(), () -> {
                this.runState.set(EnqueuedState.CLOSED);
                GrpcUtil.safelyComplete(this.responseObserver);
                if (this.messageStream != null) {
                    this.closeMessageStream();
                }
            });
        }

        class EnqueuedStreamOperation {
            private final StreamOperation wrapped;
            private final SessionState.ExportBuilder<Object> nonExport;

            EnqueuedStreamOperation(Collection<? extends SessionState.ExportObject<?>> dependencies, StreamOperation wrapped) {
                this.wrapped = wrapped;
                this.nonExport = SendMessageObserver.this.session.nonExport().onErrorHandler(SendMessageObserver.this::onError).require(List.copyOf(dependencies));
            }

            public void run() {
                this.nonExport.submit(() -> {
                    if (SendMessageObserver.this.runState.get() == EnqueuedState.CLOSED) {
                        return;
                    }
                    try {
                        this.wrapped.run();
                    }
                    catch (ObjectCommunicationException e) {
                        throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Error performing MessageStream operation");
                    }
                    if (SendMessageObserver.this.runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) {
                        SendMessageObserver.this.doWork();
                    }
                });
            }
        }
    }

    private final class PluginMessageSender
    implements ObjectType.MessageStream {
        private final StreamObserver<StreamResponse> responseObserver;
        private final SessionState sessionState;

        public PluginMessageSender(StreamObserver<StreamResponse> responseObserver, SessionState sessionState) {
            this.responseObserver = responseObserver;
            this.sessionState = sessionState;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onData(ByteBuffer message, Object[] references) throws ObjectCommunicationException {
            ArrayList exports = new ArrayList(references.length);
            try {
                ServerData.Builder payload = ServerData.newBuilder().setPayload(ByteString.copyFrom((ByteBuffer)message));
                for (Object reference : references) {
                    String type = ObjectServiceGrpcImpl.this.typeLookup.type(reference).orElse(null);
                    SessionState.ExportObject<Object> exportObject = this.sessionState.newServerSideExport(reference);
                    exports.add(exportObject);
                    TypedTicket typedTicket = this.ticketForExport(exportObject, type);
                    payload.addExportedReferences(typedTicket);
                }
                StreamResponse.Builder responseBuilder = StreamResponse.newBuilder().setData(payload);
                StreamResponse response = responseBuilder.build();
                StreamObserver<StreamResponse> streamObserver = this.responseObserver;
                synchronized (streamObserver) {
                    this.responseObserver.onNext((Object)response);
                }
            }
            catch (Throwable t) {
                ObjectServiceGrpcImpl.cleanup(exports, t);
                throw new ObjectCommunicationException(t);
            }
        }

        private TypedTicket ticketForExport(SessionState.ExportObject<?> exportObject, String type) {
            TypedTicket.Builder builder = TypedTicket.newBuilder().setTicket(exportObject.getExportId());
            if (type != null) {
                builder.setType(type);
            }
            return builder.build();
        }

        public void onClose() {
            GrpcUtil.safelyComplete(this.responseObserver);
        }
    }

    private static enum EnqueuedState {
        WAITING,
        RUNNING,
        CLOSED;

    }

    @FunctionalInterface
    static interface StreamOperation
    extends ThrowingRunnable<ObjectCommunicationException> {
    }
}

