/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.core.impl.call;

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.impl.call.GrpcStatusHandler;

public class ReadStreamCall<ReqT, RespT>
extends ClientCall.Listener<RespT>
implements GrpcReadStream<RespT> {
    private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);
    private final String traceId;
    private final ClientCall<ReqT, RespT> call;
    private final Lock callLock = new ReentrantLock();
    private final GrpcStatusHandler statusConsumer;
    private final ReqT request;
    private final Metadata headers;
    private final GrpcFlowControl.Call flow;
    private final CompletableFuture<Status> statusFuture = new CompletableFuture();
    private GrpcReadStream.Observer<RespT> consumer;

    public ReadStreamCall(String traceId, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl, ReqT req, Metadata headers, GrpcStatusHandler statusHandler) {
        this.traceId = traceId;
        this.call = call;
        this.request = req;
        this.headers = headers;
        this.statusConsumer = statusHandler;
        this.flow = flowCtrl.newCall(this::nextRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Status> start(GrpcReadStream.Observer<RespT> observer) {
        this.callLock.lock();
        try {
            if (this.consumer != null) {
                throw new IllegalStateException("Read stream call is already started");
            }
            if (observer == null) {
                throw new IllegalArgumentException("Observer must be not empty");
            }
            this.consumer = observer;
            this.call.start((ClientCall.Listener)this, this.headers);
            if (logger.isTraceEnabled()) {
                logger.trace("ReadStreamCall[{}] --> {}", (Object)this.traceId, (Object)TextFormat.shortDebugString((MessageOrBuilder)((Message)this.request)));
            }
            this.call.sendMessage(this.request);
            this.call.halfClose();
            this.flow.onStart();
        }
        catch (Throwable th) {
            this.statusFuture.completeExceptionally(th);
            try {
                this.call.cancel(null, th);
            }
            catch (Throwable ex) {
                logger.error("ReadStreamCall[{}] got exception while canceling", (Object)this.traceId, (Object)ex);
            }
        }
        finally {
            this.callLock.unlock();
        }
        return this.statusFuture;
    }

    @Override
    public void cancel() {
        this.callLock.lock();
        try {
            this.call.cancel("Cancelled on user request", (Throwable)new CancellationException());
        }
        finally {
            this.callLock.unlock();
        }
    }

    private void nextRequest(int count) {
        this.callLock.lock();
        try {
            this.call.request(count);
        }
        finally {
            this.callLock.unlock();
        }
    }

    public void onMessage(RespT message) {
        try {
            if (logger.isTraceEnabled()) {
                logger.trace("ReadStreamCall[{}] <-- {}", (Object)this.traceId, (Object)TextFormat.shortDebugString((MessageOrBuilder)((Message)message)));
            }
            this.consumer.onNext(message);
            this.flow.onMessageRead();
        }
        catch (Exception ex) {
            this.statusFuture.completeExceptionally(ex);
            try {
                this.callLock.lock();
                try {
                    this.call.cancel("Canceled by exception from observer", (Throwable)ex);
                }
                finally {
                    this.callLock.unlock();
                }
            }
            catch (Throwable th) {
                logger.error("ReadStreamCall[{}] got exception while canceling", (Object)this.traceId, (Object)th);
            }
        }
    }

    public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
        if (logger.isTraceEnabled()) {
            logger.trace("ReadStreamCall[{}] closed with status {}", (Object)this.traceId, (Object)status);
        }
        this.statusConsumer.accept(status, trailers);
        if (status.isOk()) {
            this.statusFuture.complete(Status.SUCCESS);
        } else {
            this.statusFuture.complete(GrpcStatuses.toStatus(status));
        }
    }
}

