/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.core.impl.call;

import io.grpc.ClientCall;
import io.grpc.Metadata;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.impl.auth.AuthCallOptions;
import tech.ydb.core.impl.call.GrpcStatusHandler;
import tech.ydb.core.impl.call.ReadStreamCall;

public class ReadWriteStreamCall<R, W>
extends ClientCall.Listener<R>
implements GrpcReadWriteStream<R, W> {
    private static final Logger logger = LoggerFactory.getLogger(ReadStreamCall.class);
    private final ClientCall<W, R> call;
    private final GrpcStatusHandler statusConsumer;
    private final Metadata headers;
    private final AuthCallOptions callOptions;
    private final CompletableFuture<Status> statusFuture = new CompletableFuture();
    private final AtomicReference<GrpcReadStream.Observer<R>> observerReference = new AtomicReference();
    private final Queue<W> messagesQueue = new ArrayDeque<W>();

    public ReadWriteStreamCall(ClientCall<W, R> call, Metadata headers, AuthCallOptions callOptions, GrpcStatusHandler statusConsumer) {
        this.call = call;
        this.headers = headers != null ? headers : new Metadata();
        this.statusConsumer = statusConsumer;
        this.callOptions = callOptions;
    }

    @Override
    public String authToken() {
        return this.callOptions.getToken();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Status> start(GrpcReadStream.Observer<R> observer) {
        if (!this.observerReference.compareAndSet(null, observer)) {
            throw new IllegalStateException("Read stream call is already started");
        }
        ClientCall<W, R> clientCall = this.call;
        synchronized (clientCall) {
            try {
                this.call.start((ClientCall.Listener)this, this.headers);
                this.call.request(1);
            }
            catch (Throwable t) {
                try {
                    this.call.cancel(null, t);
                }
                catch (Throwable ex) {
                    logger.error("Exception encountered while closing the unary call", ex);
                }
                this.statusFuture.completeExceptionally(t);
            }
        }
        return this.statusFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendNext(W message) {
        ClientCall<W, R> clientCall = this.call;
        synchronized (clientCall) {
            if (this.flush()) {
                this.call.sendMessage(message);
            } else {
                this.messagesQueue.add(message);
            }
        }
    }

    private boolean flush() {
        while (this.call.isReady()) {
            W next = this.messagesQueue.poll();
            if (next == null) {
                return true;
            }
            this.call.sendMessage(next);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() {
        ClientCall<W, R> clientCall = this.call;
        synchronized (clientCall) {
            this.call.cancel("Cancelled on user request", (Throwable)new CancellationException());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(R message) {
        try {
            this.observerReference.get().onNext(message);
            ClientCall<W, R> clientCall = this.call;
            synchronized (clientCall) {
                this.call.request(1);
            }
        }
        catch (Exception ex) {
            this.statusFuture.completeExceptionally(ex);
            try {
                ClientCall<W, R> clientCall = this.call;
                synchronized (clientCall) {
                    this.call.cancel("Canceled by exception from observer", (Throwable)ex);
                }
            }
            catch (Throwable th) {
                logger.error("Exception encountered while canceling the read write stream call", th);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReady() {
        ClientCall<W, R> clientCall = this.call;
        synchronized (clientCall) {
            this.flush();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        ClientCall<W, R> clientCall = this.call;
        synchronized (clientCall) {
            this.call.halfClose();
        }
    }

    public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
        this.statusConsumer.accept(status, trailers);
        if (status.isOk()) {
            this.statusFuture.complete(Status.SUCCESS);
        } else {
            this.statusFuture.complete(GrpcStatuses.toStatus(status));
        }
    }
}

