package org.apache.hugegraph.computer.core.network.session;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.TransportConf;
import org.apache.hugegraph.computer.core.network.TransportState;
import org.apache.hugegraph.computer.core.network.buffer.NioBuffer;
import org.apache.hugegraph.computer.core.network.message.DataMessage;
import org.apache.hugegraph.computer.core.network.message.FinishMessage;
import org.apache.hugegraph.computer.core.network.message.Message;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.network.message.StartMessage;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/network/session/ClientSession.class */
public class ClientSession extends TransportSession {
    private static final Logger LOG = Log.logger(ClientSession.class);
    private final int maxPendingRequests;
    private final int minPendingRequests;
    private final Lock lock;
    private volatile boolean flowBlocking;
    private final AtomicReference<CompletableFuture<Void>> startedFutureRef;
    private final AtomicReference<CompletableFuture<Void>> finishedFutureRef;
    private final Function<Message, Future<Void>> sendFunction;

    public ClientSession(TransportConf transportConf, Function<Message, Future<Void>> function) {
        super(transportConf);
        this.maxPendingRequests = this.conf.maxPendingRequests();
        this.minPendingRequests = this.conf.minPendingRequests();
        this.lock = new ReentrantLock();
        this.flowBlocking = false;
        this.startedFutureRef = new AtomicReference<>();
        this.finishedFutureRef = new AtomicReference<>();
        this.sendFunction = function;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hugegraph.computer.core.network.session.TransportSession
    public void stateReady() {
        this.flowBlocking = false;
        super.stateReady();
    }

    private void stateStartSent() {
        this.maxRequestId = 0;
        this.state = TransportState.START_SENT;
    }

    private void stateFinishSent(int i) {
        this.finishId = i;
        this.state = TransportState.FINISH_SENT;
    }

    public synchronized void start(long j) throws TransportException {
        CompletableFuture<Void> startAsync = startAsync();
        try {
            try {
                startAsync.get(j, TimeUnit.MILLISECONDS);
                startAsync.cancel(false);
                this.startedFutureRef.compareAndSet(startAsync, null);
            } catch (Throwable th) {
                stateReady();
                if (!(th instanceof TimeoutException)) {
                    throw new TransportException("Failed to wait start-response", th);
                }
                throw new TransportException("Timeout(%sms) to wait start-response", Long.valueOf(j));
            }
        } catch (Throwable th2) {
            startAsync.cancel(false);
            this.startedFutureRef.compareAndSet(startAsync, null);
            throw th2;
        }
    }

    public synchronized CompletableFuture<Void> startAsync() {
        E.checkArgument(this.state == TransportState.READY, "The state must be READY instead of %s at startAsync()", new Object[]{this.state});
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        E.checkArgument(this.startedFutureRef.compareAndSet(null, completableFuture), "The startedFutureRef value must be null at startAsync()", new Object[0]);
        stateStartSent();
        try {
            this.sendFunction.apply(StartMessage.INSTANCE);
            return completableFuture;
        } catch (Throwable th) {
            stateReady();
            completableFuture.cancel(false);
            this.startedFutureRef.compareAndSet(completableFuture, null);
            throw th;
        }
    }

    public synchronized void finish(long j) throws TransportException {
        CompletableFuture<Void> finishAsync = finishAsync();
        try {
            try {
                finishAsync.get(j, TimeUnit.MILLISECONDS);
                this.finishedFutureRef.compareAndSet(finishAsync, null);
            } catch (Throwable th) {
                stateEstablished();
                if (!(th instanceof TimeoutException)) {
                    throw new TransportException("Failed to wait finish-response", th);
                }
                throw new TransportException("Timeout(%sms) to wait finish-response", Long.valueOf(j));
            }
        } catch (Throwable th2) {
            this.finishedFutureRef.compareAndSet(finishAsync, null);
            throw th2;
        }
    }

    public synchronized CompletableFuture<Void> finishAsync() {
        E.checkArgument(this.state == TransportState.ESTABLISHED, "The state must be ESTABLISHED instead of %s at finishAsync()", new Object[]{this.state});
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        E.checkArgument(this.finishedFutureRef.compareAndSet(null, completableFuture), "The finishedFutureRef value must be null at finishAsync()", new Object[0]);
        int genFinishId = genFinishId();
        stateFinishSent(genFinishId);
        try {
            this.sendFunction.apply(new FinishMessage(genFinishId));
            return completableFuture;
        } catch (Throwable th) {
            stateEstablished();
            completableFuture.cancel(false);
            this.finishedFutureRef.compareAndSet(completableFuture, null);
            throw th;
        }
    }

    public synchronized void sendAsync(MessageType messageType, int i, ByteBuffer byteBuffer) {
        E.checkArgument(this.state == TransportState.ESTABLISHED, "The state must be ESTABLISHED instead of %s at sendAsync()", new Object[]{this.state});
        this.sendFunction.apply(new DataMessage(messageType, nextRequestId(), i, new NioBuffer(byteBuffer)));
        updateFlowBlocking();
    }

    public void onRecvAck(int i) {
        switch (this.state) {
            case START_SENT:
                if (i == 0) {
                    onRecvStartAck();
                    return;
                }
                break;
            case FINISH_SENT:
                break;
            case ESTABLISHED:
                onRecvDataAck(i);
                return;
            default:
                throw new ComputerException("Receive one ack message, but the state not match, state: %s, ackId: %s", new Object[]{this.state, Integer.valueOf(i)});
        }
        if (i == this.finishId) {
            onRecvFinishAck();
        } else {
            onRecvDataAck(i);
        }
    }

    private void onRecvStartAck() {
        E.checkArgument(this.state == TransportState.START_SENT, "The state must be START_SENT instead of %s at completeStateStart()", new Object[]{this.state});
        this.maxAckId = 0;
        stateEstablished();
        CompletableFuture<Void> completableFuture = this.startedFutureRef.get();
        if (completableFuture != null) {
            if (!completableFuture.isCancelled() && !completableFuture.complete(null)) {
                LOG.warn("The startedFuture can't be completed");
            }
            this.startedFutureRef.compareAndSet(completableFuture, null);
        }
    }

    private void onRecvFinishAck() {
        E.checkArgument(this.state == TransportState.FINISH_SENT, "The state must be FINISH_SENT instead of %s at completeStateFinish()", new Object[]{this.state});
        stateReady();
        CompletableFuture<Void> completableFuture = this.finishedFutureRef.get();
        if (completableFuture != null) {
            if (!completableFuture.isCancelled() && !completableFuture.complete(null)) {
                LOG.warn("The finishedFuture can't be completed");
            }
            this.finishedFutureRef.compareAndSet(completableFuture, null);
        }
    }

    private void onRecvDataAck(int i) {
        if (i > this.maxAckId) {
            this.maxAckId = i;
        }
        updateFlowBlocking();
    }

    public boolean flowBlocking() {
        return this.flowBlocking;
    }

    private void updateFlowBlocking() {
        this.lock.lock();
        try {
            int i = this.maxRequestId - this.maxAckId;
            if (i >= this.maxPendingRequests) {
                this.flowBlocking = true;
            } else if (i < this.minPendingRequests) {
                this.flowBlocking = false;
            }
        } finally {
            this.lock.unlock();
        }
    }
}
