package org.oracle.okafka.clients;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import org.oracle.okafka.common.Cluster;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.errors.AuthenticationException;
import org.oracle.okafka.common.errors.InvalidLoginCredentialsException;
import org.oracle.okafka.common.metrics.Sensor;
import org.oracle.okafka.common.network.AQClient;
import org.oracle.okafka.common.requests.AbstractRequest;
import org.oracle.okafka.common.requests.MetadataRequest;
import org.oracle.okafka.common.requests.MetadataResponse;
import org.oracle.okafka.common.requests.RequestHeader;
import org.oracle.okafka.common.utils.LogContext;
import org.oracle.okafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:org/oracle/okafka/clients/NetworkClient.class */
public class NetworkClient implements KafkaClient {
    private final Logger log;
    private final AQClient aqClient;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int defaultRequestTimeoutMs;
    private final long reconnectBackoffMs;
    private final Time time;
    private final Sensor throttleTimeSensor;

    /* loaded from: input_file:org/oracle/okafka/clients/NetworkClient$DefaultMetadataUpdater.class */
    class DefaultMetadataUpdater implements MetadataUpdater {
        private final Metadata metadata;
        private boolean metadataFetchInProgress = false;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public boolean isUpdateDue(long j) {
            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(j) == 0;
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public long maybeUpdate(long j) {
            long max = Math.max(this.metadata.timeToNextUpdate(j), this.metadataFetchInProgress ? NetworkClient.this.defaultRequestTimeoutMs : 0L);
            if (max > 0) {
                return max;
            }
            Node leastLoadedNode = NetworkClient.this.leastLoadedNode(j);
            if (leastLoadedNode != null) {
                return maybeUpdate(j, leastLoadedNode);
            }
            NetworkClient.this.log.debug("Give up sending metadata request since no node is available");
            return NetworkClient.this.reconnectBackoffMs;
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long j, MetadataResponse metadataResponse) {
            this.metadataFetchInProgress = false;
            Cluster cluster = metadataResponse.cluster(this.metadata.getConfigs());
            Map<String, Exception> map = metadataResponse.topicErrors();
            if (!map.isEmpty()) {
                NetworkClient.this.log.warn("Error while fetching metadata : {}", map);
            }
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, null, j);
            } else {
                NetworkClient.this.log.trace("Ignoring empty metadata response with correlation id {}.", Integer.valueOf(requestHeader.correlationId()));
                this.metadata.failedUpdate(j, null);
            }
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public void handleDisconnection(String str) {
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public void handleAuthenticationFailure(AuthenticationException authenticationException) {
            this.metadataFetchInProgress = false;
            if (this.metadata.updateRequested()) {
                this.metadata.failedUpdate(NetworkClient.this.time.milliseconds(), authenticationException);
            }
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater
        public void requestUpdate() {
            this.metadata.requestUpdate();
        }

        @Override // org.oracle.okafka.clients.MetadataUpdater, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            NetworkClient.this.aqClient.close();
            this.metadata.close();
        }

        private long maybeUpdate(long j, Node node) {
            if (!NetworkClient.this.canSendRequest(node, j)) {
                if (!NetworkClient.this.connectionStates.canConnect(node, j)) {
                    return NetworkClient.this.reconnectBackoffMs;
                }
                NetworkClient.this.log.debug("Initialize connection to node {} for sending metadata request", node);
                try {
                    if (!NetworkClient.this.initiateConnect(node, j)) {
                        return NetworkClient.this.reconnectBackoffMs;
                    }
                } catch (InvalidLoginCredentialsException e) {
                    NetworkClient.this.log.error("Failed to connect to node {} with error {}", node, e.getMessage());
                    this.metadata.failedUpdate(j, new AuthenticationException(e.getMessage()));
                    return NetworkClient.this.reconnectBackoffMs;
                }
            }
            this.metadataFetchInProgress = true;
            MetadataRequest.Builder allTopics = this.metadata.needMetadataForAllTopics() ? MetadataRequest.Builder.allTopics() : new MetadataRequest.Builder(new ArrayList(this.metadata.topics()), this.metadata.allowAutoTopicCreation());
            NetworkClient.this.log.debug("Sending metadata request {} to node {}", allTopics, node);
            NetworkClient.this.sendInternalMetadataRequest(allTopics, node, j);
            return NetworkClient.this.defaultRequestTimeoutMs;
        }
    }

    public NetworkClient(AQClient aQClient, Metadata metadata, String str, long j, long j2, int i, int i2, int i3, Time time, LogContext logContext) {
        this(null, metadata, aQClient, str, j, j2, i, i2, i3, time, null, logContext);
    }

    public NetworkClient(AQClient aQClient, Metadata metadata, String str, long j, long j2, int i, int i2, int i3, Time time, Sensor sensor, LogContext logContext) {
        this(null, metadata, aQClient, str, j, j2, i, i2, i3, time, sensor, logContext);
    }

    public NetworkClient(AQClient aQClient, MetadataUpdater metadataUpdater, String str, long j, long j2, int i, int i2, int i3, Time time, LogContext logContext) {
        this(metadataUpdater, null, aQClient, str, j, j2, i, i2, i3, time, null, logContext);
    }

    private NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, AQClient aQClient, String str, long j, long j2, int i, int i2, int i3, Time time, Sensor sensor, LogContext logContext) {
        if (metadataUpdater != null) {
            this.metadataUpdater = metadataUpdater;
        } else {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        }
        this.aqClient = aQClient;
        this.clientId = str;
        this.connectionStates = new ClusterConnectionStates(j, j2);
        this.socketSendBuffer = i;
        this.socketReceiveBuffer = i2;
        this.correlation = 0;
        this.randOffset = new Random();
        this.defaultRequestTimeoutMs = i3;
        this.reconnectBackoffMs = j;
        this.time = time;
        this.throttleTimeSensor = sensor;
        this.log = logContext.logger(NetworkClient.class);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public boolean ready(Node node, long j) {
        if (node.isEmpty()) {
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        }
        if (isReady(node, j)) {
            return true;
        }
        if (this.connectionStates.canConnect(node, j)) {
            return initiateConnect(node, j);
        }
        return false;
    }

    boolean canConnect(Node node, long j) {
        return this.connectionStates.canConnect(node, j);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public void disconnect(Node node) {
    }

    public ClusterConnectionStates getConnectionStates() {
        return this.connectionStates;
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public void close(Node node) {
        this.aqClient.close(node);
        this.connectionStates.remove(node);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public long connectionDelay(Node node, long j) {
        return this.connectionStates.connectionDelay(node, j);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public long pollDelayMs(Node node, long j) {
        return this.connectionStates.pollDelayMs(node, j);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public boolean connectionFailed(Node node) {
        return this.connectionStates.isDisconnected(node);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public AuthenticationException authenticationException(Node node) {
        return this.connectionStates.authenticationException(node);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public boolean isReady(Node node, long j) {
        return !this.metadataUpdater.isUpdateDue(j) && canSendRequest(node, j);
    }

    private boolean canSendRequest(Node node, long j) {
        return this.connectionStates.isReady(node, j);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public ClientResponse send(ClientRequest clientRequest, long j) {
        return doSend(clientRequest, false, j);
    }

    private void sendInternalMetadataRequest(MetadataRequest.Builder builder, Node node, long j) {
        ClientResponse doSend = doSend(newClientRequest(node, builder, j, true), true, j);
        this.log.debug("Got response for metadata request {} from node {}", builder, node);
        this.metadataUpdater.handleCompletedMetadataResponse(doSend.requestHeader(), this.time.milliseconds(), (MetadataResponse) doSend.responseBody());
    }

    private ClientResponse doSend(ClientRequest clientRequest, boolean z, long j) {
        Node destination = clientRequest.destination();
        if (destination != null && !z && !canSendRequest(destination, j)) {
            throw new IllegalStateException("Attempt to send a request to node " + destination + " which is not ready.");
        }
        ClientResponse send = this.aqClient.send(clientRequest);
        handleDisconnection(destination, send.wasDisconnected(), this.time.milliseconds());
        return send;
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public boolean hasReadyNodes(long j) {
        return this.connectionStates.hasReadyNodes(j);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public long maybeUpdateMetadata(long j) {
        return this.metadataUpdater.maybeUpdate(j);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.aqClient.close();
        this.metadataUpdater.close();
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public Node leastLoadedNode(long j) {
        List<Node> fetchNodes = this.metadataUpdater.fetchNodes();
        Node node = null;
        int nextInt = this.randOffset.nextInt(fetchNodes.size());
        for (int i = 0; i < fetchNodes.size(); i++) {
            Node node2 = fetchNodes.get((nextInt + i) % fetchNodes.size());
            if (isReady(node2, j)) {
                this.log.debug("Found least loaded node {}", node2);
                return node2;
            }
            if (!this.connectionStates.isBlackedOut(node2, j)) {
                node = node2;
            } else if (this.log.isTraceEnabled()) {
                this.log.debug("Removing node {} from least loaded node selection: is-blacked-out: {}", node2, Boolean.valueOf(this.connectionStates.isBlackedOut(node2, j)));
            }
        }
        if (node != null) {
            this.log.debug("Found least loaded node {}", node);
        } else {
            this.log.debug("Least loaded node selection failed to find an available node");
        }
        return node;
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public void disconnected(Node node, long j) {
        this.connectionStates.disconnected(node, j);
    }

    private boolean initiateConnect(Node node, long j) {
        try {
            this.log.debug("Initiating connection to node {}", node);
            this.connectionStates.connecting(node, j);
            this.aqClient.connect(node);
            this.connectionStates.ready(node);
            this.log.trace("Connection is established to node {}", node);
            return true;
        } catch (Exception e) {
            if (e instanceof JMSException) {
                if (e.getErrorCode().equals("1405")) {
                    this.log.error("create session privilege is not assigned", e.getMessage());
                    this.log.info("create session, execute on dbms_aqin, execute on dbms_aqadm privileges required for producer to work");
                } else if (e.getErrorCode().equals("6550")) {
                    this.log.error("execute on dbms_aqin is not assigned", e.getMessage());
                    this.log.info("create session, execute on dbms_aqin, dbms_aqadm , dbms_aqjms privileges required for producer or consumer to work");
                }
            }
            this.connectionStates.disconnected(node, j);
            this.metadataUpdater.requestUpdate();
            this.log.warn("Error connecting to node {}", node, e);
            if ((e instanceof JMSSecurityException) || e.getErrorCode().equals("12505")) {
                throw new InvalidLoginCredentialsException("Invalid login details provided:" + e.getMessage());
            }
            return false;
        }
    }

    private void handleDisconnection(Node node, boolean z, long j) {
        if (z) {
            disconnected(node, j);
            this.metadataUpdater.requestUpdate();
        }
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public ClientRequest newClientRequest(Node node, AbstractRequest.Builder<?> builder, long j, boolean z) {
        return newClientRequest(node, builder, j, z, this.defaultRequestTimeoutMs, null);
    }

    @Override // org.oracle.okafka.clients.KafkaClient
    public ClientRequest newClientRequest(Node node, AbstractRequest.Builder<?> builder, long j, boolean z, int i, RequestCompletionHandler requestCompletionHandler) {
        int i2 = this.correlation;
        this.correlation = i2 + 1;
        return new ClientRequest(node, builder, i2, this.clientId, j, z, this.defaultRequestTimeoutMs, requestCompletionHandler);
    }
}
