/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.coordination.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.coordination.impl.Rpc;
import tech.ydb.coordination.impl.StreamMsg;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.coordination.SessionRequest;
import tech.ydb.proto.coordination.SessionResponse;

class Stream
implements GrpcReadStream.Observer<SessionResponse> {
    private static final int SHUTDOWN_TIMEOUT_MS = 1000;
    private static final Logger logger = LoggerFactory.getLogger(Stream.class);
    private final ScheduledExecutorService scheduler;
    private final GrpcReadWriteStream<SessionResponse, SessionRequest> stream;
    private final CompletableFuture<Status> stopFuture = new CompletableFuture();
    private final CompletableFuture<Result<Long>> startFuture = new CompletableFuture();
    private final Map<Long, StreamMsg<?>> messages = new ConcurrentHashMap();

    Stream(Rpc rpc) {
        this.scheduler = rpc.getScheduler();
        this.stream = rpc.createSession(GrpcRequestSettings.newBuilder().build());
    }

    public CompletableFuture<Status> startStream() {
        this.stream.start((GrpcReadStream.Observer)this).whenComplete((status, th) -> {
            if (th != null) {
                this.startFuture.completeExceptionally((Throwable)th);
                this.stopFuture.completeExceptionally((Throwable)th);
            }
            if (status != null) {
                this.startFuture.complete((Result<Long>)Result.fail((Status)(status.isSuccess() ? Status.of((StatusCode)StatusCode.BAD_REQUEST) : status)));
                this.stopFuture.complete((Status)status);
            }
        });
        return this.stopFuture;
    }

    public Collection<StreamMsg<?>> getMessages() {
        return this.messages.values();
    }

    public void cancelStream() {
        logger.trace("stream {} cancel stream", (Object)this.hashCode());
        this.stream.cancel();
    }

    public CompletableFuture<Result<Long>> sendSessionStart(long reqId, String node, Duration timeout, ByteString key) {
        SessionRequest startMsg = SessionRequest.newBuilder().setSessionStart(SessionRequest.SessionStart.newBuilder().setSessionId(reqId).setPath(node).setTimeoutMillis(timeout.toMillis()).setProtectionKey(key).build()).build();
        logger.trace("stream {} send session start msg {}", (Object)this.hashCode(), (Object)reqId);
        this.stream.sendNext((Object)startMsg);
        return this.startFuture;
    }

    public CompletableFuture<Status> stop() {
        if (this.stopFuture.isDone()) {
            return this.stopFuture;
        }
        SessionRequest stopMsg = SessionRequest.newBuilder().setSessionStop(SessionRequest.SessionStop.newBuilder().build()).build();
        logger.trace("stream {} send session stop msg", (Object)this.hashCode());
        this.stream.sendNext((Object)stopMsg);
        ScheduledFuture<?> timer = this.scheduler.schedule(this::cancelStream, 1000L, TimeUnit.MILLISECONDS);
        this.stopFuture.whenComplete((st, ex) -> {
            if (!timer.isDone()) {
                timer.cancel(true);
            }
        });
        return this.stopFuture;
    }

    public void sendMsg(long requestId, StreamMsg<?> msg) {
        StreamMsg<?> oldMsg = this.messages.put(requestId, msg);
        SessionRequest request = msg.makeRequest(requestId);
        logger.trace("stream {} send message {}", (Object)this.hashCode(), (Object)TextFormat.shortDebugString((MessageOrBuilder)request));
        this.stream.sendNext((Object)request);
        if (oldMsg != null) {
            oldMsg.handleError(Status.of((StatusCode)StatusCode.CLIENT_CANCELLED));
        }
    }

    public void onNext(SessionResponse resp) {
        if (resp.hasFailure()) {
            this.onFail(resp.getFailure());
            return;
        }
        if (resp.hasSessionStarted()) {
            this.onSessionStarted(resp.getSessionStarted());
            return;
        }
        if (resp.hasSessionStopped()) {
            this.onSessionStopped(resp.getSessionStopped());
            return;
        }
        if (resp.hasPing()) {
            this.onPing(resp.getPing());
            return;
        }
        if (resp.hasPong()) {
            long opaque = resp.getPong().getOpaque();
            logger.trace("stream {} got pong msg {}", (Object)this.hashCode(), (Object)Long.toUnsignedString(opaque));
            return;
        }
        if (resp.hasAcquireSemaphorePending()) {
            long reqId = resp.getAcquireSemaphorePending().getReqId();
            logger.trace("stream {} got acquire semaphore pending msg {}", (Object)this.hashCode(), (Object)reqId);
            return;
        }
        if (resp.hasCreateSemaphoreResult()) {
            this.onNextMessage(resp.getCreateSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasDeleteSemaphoreResult()) {
            this.onNextMessage(resp.getDeleteSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasUpdateSemaphoreResult()) {
            this.onNextMessage(resp.getUpdateSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasDescribeSemaphoreResult()) {
            this.onNextMessage(resp.getDescribeSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasAcquireSemaphoreResult()) {
            this.onNextMessage(resp.getAcquireSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasReleaseSemaphoreResult()) {
            this.onNextMessage(resp.getReleaseSemaphoreResult().getReqId(), resp);
        }
        if (resp.hasDescribeSemaphoreChanged()) {
            this.onNextMessage(resp.getDescribeSemaphoreChanged().getReqId(), resp);
        }
    }

    public void onNextMessage(long reqId, SessionResponse resp) {
        StreamMsg<?> msg = this.messages.remove(reqId);
        if (msg != null && msg.handleResponse(resp)) {
            StreamMsg<?> old;
            logger.trace("stream {} got response {}", (Object)this.hashCode(), (Object)TextFormat.shortDebugString((MessageOrBuilder)resp));
            StreamMsg<?> nextMsg = msg.nextMsg();
            if (nextMsg != null && (old = this.messages.put(reqId, nextMsg)) != null) {
                old.handleError(Status.of((StatusCode)StatusCode.CLIENT_CANCELLED));
            }
        } else {
            logger.warn("stream {} lost response {}", (Object)this.hashCode(), (Object)TextFormat.shortDebugString((MessageOrBuilder)resp));
        }
    }

    private void onFail(SessionResponse.Failure msg) {
        Status status = Status.of((StatusCode)StatusCode.fromProto((StatusCodesProtos.StatusIds.StatusCode)msg.getStatus()), (Issue[])Issue.fromPb((List)msg.getIssuesList()));
        logger.trace("stream {} got fail message {}", (Object)this.hashCode(), (Object)status);
        this.stopFuture.complete(status);
        this.startFuture.complete((Result<Long>)Result.fail((Status)status));
    }

    private void onSessionStarted(SessionResponse.SessionStarted msg) {
        long id = msg.getSessionId();
        if (this.startFuture.complete((Result<Long>)Result.success((Object)id))) {
            logger.trace("stream {} started with id {}", (Object)this.hashCode(), (Object)id);
        } else {
            logger.warn("stream {} lost the start message with id {}", (Object)this.hashCode(), (Object)id);
        }
    }

    private void onSessionStopped(SessionResponse.SessionStopped msg) {
        logger.trace("stream {} stopped with id {}", (Object)this.hashCode(), (Object)msg.getSessionId());
        this.stream.close();
    }

    private void onPing(SessionResponse.PingPong msg) {
        long opaque = msg.getOpaque();
        SessionRequest pong = SessionRequest.newBuilder().setPong(SessionRequest.PingPong.newBuilder().setOpaque(opaque).build()).build();
        logger.trace("stream {} got ping msg {}, sending pong msg", (Object)this.hashCode(), (Object)Long.toUnsignedString(opaque));
        this.stream.sendNext((Object)pong);
    }
}

