package alluxio.security.authentication;

import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnauthenticatedException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.GrpcChannelKey;
import alluxio.grpc.SaslMessage;
import alluxio.shaded.client.com.google.common.util.concurrent.SettableFuture;
import alluxio.shaded.client.io.grpc.Status;
import alluxio.shaded.client.io.grpc.StatusRuntimeException;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.util.LogUtils;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.sasl.SaslException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/security/authentication/AuthenticatedChannelClientDriver.class */
public class AuthenticatedChannelClientDriver implements StreamObserver<SaslMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(AuthenticatedChannelClientDriver.class);
    private GrpcChannelKey mChannelKey;
    private StreamObserver<SaslMessage> mRequestObserver;
    private SaslClientHandler mSaslClientHandler;
    private volatile boolean mChannelAuthenticated = false;
    private SettableFuture<Void> mChannelAuthenticatedFuture = SettableFuture.create();
    private SaslMessage mInitiateMessage = generateInitialMessage();

    public AuthenticatedChannelClientDriver(SaslClientHandler saslClientHandler, GrpcChannelKey grpcChannelKey) throws SaslException {
        this.mSaslClientHandler = saslClientHandler;
        this.mChannelKey = grpcChannelKey;
    }

    public void setServerObserver(StreamObserver<SaslMessage> streamObserver) {
        this.mRequestObserver = streamObserver;
    }

    @Override // alluxio.shaded.client.io.grpc.stub.StreamObserver
    public void onNext(SaslMessage saslMessage) {
        try {
            LOG.debug("Received message for channel: {}. Message: {}", this.mChannelKey.toStringShort(), saslMessage);
            SaslMessage handleMessage = this.mSaslClientHandler.handleMessage(saslMessage);
            if (handleMessage != null) {
                this.mRequestObserver.onNext(handleMessage);
            } else {
                LOG.debug("Authentication established for {}", this.mChannelKey.toStringShort());
                this.mChannelAuthenticatedFuture.set(null);
            }
        } catch (Throwable th) {
            LOG.debug("Exception while handling message for {}. Message: {}. Error: {}", new Object[]{this.mChannelKey.toStringShort(), saslMessage, th});
            this.mChannelAuthenticatedFuture.setException(th);
            this.mRequestObserver.onError(AlluxioStatusException.fromThrowable(th).toGrpcStatusException());
        }
    }

    @Override // alluxio.shaded.client.io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        LOG.debug("Authentication stream failed for client. Channel: {}. Error: {}", this.mChannelKey.toStringShort(), th);
        closeAuthenticatedChannel(false);
        this.mChannelAuthenticatedFuture.setException(th);
    }

    @Override // alluxio.shaded.client.io.grpc.stub.StreamObserver
    public void onCompleted() {
        LOG.debug("Authentication revoked by server. Channel: {}", this.mChannelKey.toStringShort());
        closeAuthenticatedChannel(false);
    }

    public void close() {
        LOG.debug("Authentication client-driver closing. Channel: {}", this.mChannelKey.toStringShort());
        closeAuthenticatedChannel(true);
    }

    public boolean isAuthenticated() {
        return this.mChannelAuthenticated;
    }

    public void startAuthenticatedChannel(long j) throws AlluxioStatusException {
        try {
            LOG.debug("Initiating authentication for channel: {}", this.mChannelKey.toStringShort());
            try {
                this.mRequestObserver.onNext(this.mInitiateMessage);
            } catch (StatusRuntimeException e) {
            }
            waitUntilChannelAuthenticated(j);
        } catch (Throwable th) {
            closeAuthenticatedChannel(true);
            throw AlluxioStatusException.fromThrowable(th);
        }
    }

    private SaslMessage generateInitialMessage() throws SaslException {
        SaslMessage.Builder builder = this.mSaslClientHandler.handleMessage(null).toBuilder();
        builder.setClientId(this.mChannelKey.getChannelId().toString());
        builder.setChannelRef(this.mChannelKey.toStringShort());
        return builder.build();
    }

    private void waitUntilChannelAuthenticated(long j) throws AlluxioStatusException {
        try {
            this.mChannelAuthenticatedFuture.get(j, TimeUnit.MILLISECONDS);
            this.mChannelAuthenticated = true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw AlluxioStatusException.fromThrowable(e);
        } catch (ExecutionException e2) {
            AlluxioStatusException fromThrowable = AlluxioStatusException.fromThrowable(e2.getCause());
            if (fromThrowable.getStatusCode() != Status.Code.UNIMPLEMENTED) {
                throw fromThrowable;
            }
            throw new UnauthenticatedException("Authentication is disabled on target server.");
        } catch (TimeoutException e3) {
            throw new UnavailableException(e3);
        }
    }

    private void closeAuthenticatedChannel(boolean z) {
        this.mSaslClientHandler.close();
        this.mChannelAuthenticated = false;
        if (z) {
            try {
                this.mRequestObserver.onCompleted();
            } catch (Exception e) {
                LogUtils.warnWithException(LOG, "Failed signaling server for stream completion for channel: {}.", this.mChannelKey.toStringShort(), e);
            }
        }
    }
}
