package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.ApiVersions;
import datahub.shaded.org.apache.kafka.clients.ClientRequest;
import datahub.shaded.org.apache.kafka.clients.ClientResponse;
import datahub.shaded.org.apache.kafka.clients.ClientUtils;
import datahub.shaded.org.apache.kafka.clients.KafkaClient;
import datahub.shaded.org.apache.kafka.clients.Metadata;
import datahub.shaded.org.apache.kafka.clients.NetworkClientUtils;
import datahub.shaded.org.apache.kafka.clients.RequestCompletionHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import datahub.shaded.org.apache.kafka.common.Node;
import datahub.shaded.org.apache.kafka.common.errors.DisconnectException;
import datahub.shaded.org.apache.kafka.common.errors.TimeoutException;
import datahub.shaded.org.apache.kafka.common.metrics.Metrics;
import datahub.shaded.org.apache.kafka.common.metrics.Sensor;
import datahub.shaded.org.apache.kafka.common.requests.AbstractRequest;
import datahub.shaded.org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.org.apache.kafka.common.utils.Timer;
import datahub.shaded.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.class */
public class NetworkClientDelegate implements AutoCloseable {
    private final KafkaClient client;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Metadata metadata;
    private final Time time;
    private final Logger log;
    private final int requestTimeoutMs;
    private final long retryBackoffMs;
    private final boolean notifyMetadataErrorsViaErrorQueue;
    private final AsyncConsumerMetrics asyncConsumerMetrics;
    private final Queue<UnsentRequest> unsentRequests = new ArrayDeque();
    private Optional<Exception> metadataError = Optional.empty();

    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate$FutureCompletionHandler.class */
    public static class FutureCompletionHandler implements RequestCompletionHandler {
        private long responseCompletionTimeMs;
        private final CompletableFuture<ClientResponse> future = new CompletableFuture<>();

        FutureCompletionHandler() {
        }

        public void onFailure(long j, RuntimeException runtimeException) {
            this.responseCompletionTimeMs = j;
            if (runtimeException != null) {
                this.future.completeExceptionally(runtimeException);
            } else {
                this.future.completeExceptionally(DisconnectException.INSTANCE);
            }
        }

        public long completionTimeMs() {
            return this.responseCompletionTimeMs;
        }

        @Override // datahub.shaded.org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            long receivedTimeMs = clientResponse.receivedTimeMs();
            if (clientResponse.authenticationException() != null) {
                onFailure(receivedTimeMs, clientResponse.authenticationException());
                return;
            }
            if (clientResponse.wasDisconnected()) {
                onFailure(receivedTimeMs, DisconnectException.INSTANCE);
            } else if (clientResponse.versionMismatch() != null) {
                onFailure(receivedTimeMs, clientResponse.versionMismatch());
            } else {
                this.responseCompletionTimeMs = receivedTimeMs;
                this.future.complete(clientResponse);
            }
        }

        public CompletableFuture<ClientResponse> future() {
            return this.future;
        }
    }

    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate$PollResult.class */
    public static class PollResult {
        public static final long WAIT_FOREVER = Long.MAX_VALUE;
        public static final PollResult EMPTY = new PollResult(WAIT_FOREVER);
        public final long timeUntilNextPollMs;
        public final List<UnsentRequest> unsentRequests;

        public PollResult(long j, List<UnsentRequest> list) {
            this.timeUntilNextPollMs = j;
            this.unsentRequests = Collections.unmodifiableList(list);
        }

        public PollResult(List<UnsentRequest> list) {
            this(WAIT_FOREVER, list);
        }

        public PollResult(UnsentRequest unsentRequest) {
            this((List<UnsentRequest>) Collections.singletonList(unsentRequest));
        }

        public PollResult(long j) {
            this(j, Collections.emptyList());
        }
    }

    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate$UnsentRequest.class */
    public static class UnsentRequest {
        private final AbstractRequest.Builder<?> requestBuilder;
        private final FutureCompletionHandler handler;
        private final Optional<Node> node;
        private Timer timer;
        private long enqueueTimeMs;

        public UnsentRequest(AbstractRequest.Builder<?> builder, Optional<Node> optional) {
            Objects.requireNonNull(builder);
            this.requestBuilder = builder;
            this.node = optional;
            this.handler = new FutureCompletionHandler();
        }

        void setTimer(Time time, long j) {
            this.timer = time.timer(j);
        }

        Timer timer() {
            return this.timer;
        }

        private void setEnqueueTimeMs(long j) {
            this.enqueueTimeMs = j;
        }

        private long enqueueTimeMs() {
            return this.enqueueTimeMs;
        }

        CompletableFuture<ClientResponse> future() {
            return this.handler.future;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public FutureCompletionHandler handler() {
            return this.handler;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public UnsentRequest whenComplete(BiConsumer<ClientResponse, Throwable> biConsumer) {
            this.handler.future().whenComplete((BiConsumer<? super ClientResponse, ? super Throwable>) biConsumer);
            return this;
        }

        AbstractRequest.Builder<?> requestBuilder() {
            return this.requestBuilder;
        }

        Optional<Node> node() {
            return this.node;
        }

        public String toString() {
            String str;
            if (this.timer != null) {
                this.timer.update();
                str = String.valueOf(this.timer.remainingMs());
            } else {
                str = "<not set>";
            }
            return "UnsentRequest{requestBuilder=" + String.valueOf(this.requestBuilder) + ", handler=" + String.valueOf(this.handler) + ", node=" + String.valueOf(this.node) + ", remainingMs=" + str + "}";
        }
    }

    public NetworkClientDelegate(Time time, ConsumerConfig consumerConfig, LogContext logContext, KafkaClient kafkaClient, Metadata metadata, BackgroundEventHandler backgroundEventHandler, boolean z, AsyncConsumerMetrics asyncConsumerMetrics) {
        this.time = time;
        this.client = kafkaClient;
        this.metadata = metadata;
        this.backgroundEventHandler = backgroundEventHandler;
        this.log = logContext.logger(getClass());
        this.requestTimeoutMs = consumerConfig.getInt("request.timeout.ms").intValue();
        this.retryBackoffMs = consumerConfig.getLong("retry.backoff.ms").longValue();
        this.notifyMetadataErrorsViaErrorQueue = z;
        this.asyncConsumerMetrics = asyncConsumerMetrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Queue<UnsentRequest> unsentRequests() {
        return this.unsentRequests;
    }

    public int inflightRequestCount() {
        return this.client.inFlightRequestCount();
    }

    public boolean isUnavailable(Node node) {
        return NetworkClientUtils.isUnavailable(this.client, node, this.time);
    }

    public void maybeThrowAuthFailure(Node node) {
        NetworkClientUtils.maybeThrowAuthFailure(this.client, node);
    }

    public void tryConnect(Node node) {
        NetworkClientUtils.tryConnect(this.client, node, this.time);
    }

    public void poll(long j, long j2) {
        trySend(j2);
        long j3 = j;
        if (!this.unsentRequests.isEmpty()) {
            j3 = Math.min(this.retryBackoffMs, j3);
        }
        this.client.poll(j3, j2);
        maybePropagateMetadataError();
        checkDisconnects(j2);
        this.asyncConsumerMetrics.recordUnsentRequestsQueueSize(this.unsentRequests.size(), j2);
    }

    private void maybePropagateMetadataError() {
        try {
            this.metadata.maybeThrowAnyException();
        } catch (Exception e) {
            if (this.notifyMetadataErrorsViaErrorQueue) {
                this.backgroundEventHandler.add(new ErrorEvent(e));
            } else {
                this.metadataError = Optional.of(e);
            }
        }
    }

    public boolean hasAnyPendingRequests() {
        return this.client.hasInFlightRequests() || !this.unsentRequests.isEmpty();
    }

    private void trySend(long j) {
        Iterator<UnsentRequest> it = this.unsentRequests.iterator();
        while (it.hasNext()) {
            UnsentRequest next = it.next();
            next.timer.update(j);
            if (next.timer.isExpired()) {
                it.remove();
                this.asyncConsumerMetrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - next.enqueueTimeMs());
                next.handler.onFailure(j, new TimeoutException("Failed to send request after " + next.timer.timeoutMs() + " ms."));
            } else if (doSend(next, j)) {
                it.remove();
                this.asyncConsumerMetrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - next.enqueueTimeMs());
            }
        }
    }

    boolean doSend(UnsentRequest unsentRequest, long j) {
        Node orElse = unsentRequest.node.orElse(this.client.leastLoadedNode(j).node());
        if (orElse == null || nodeUnavailable(orElse)) {
            this.log.debug("No broker available to send the request: {}. Retrying.", unsentRequest);
            return false;
        }
        ClientRequest makeClientRequest = makeClientRequest(unsentRequest, orElse, j);
        if (this.client.ready(orElse, j)) {
            this.client.send(makeClientRequest, j);
            return true;
        }
        this.log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", orElse, unsentRequest);
        return false;
    }

    protected void checkDisconnects(long j) {
        Iterator<UnsentRequest> it = this.unsentRequests.iterator();
        while (it.hasNext()) {
            UnsentRequest next = it.next();
            if (next.node.isPresent() && this.client.connectionFailed(next.node.get())) {
                it.remove();
                this.asyncConsumerMetrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - next.enqueueTimeMs());
                next.handler.onFailure(j, this.client.authenticationException(next.node.get()));
            }
        }
    }

    private ClientRequest makeClientRequest(UnsentRequest unsentRequest, Node node, long j) {
        return this.client.newClientRequest(node.idString(), unsentRequest.requestBuilder, j, true, (int) unsentRequest.timer.remainingMs(), unsentRequest.handler);
    }

    public Optional<Exception> getAndClearMetadataError() {
        Optional<Exception> optional = this.metadataError;
        this.metadataError = Optional.empty();
        return optional;
    }

    public Node leastLoadedNode() {
        return this.client.leastLoadedNode(this.time.milliseconds()).node();
    }

    public void wakeup() {
        this.client.wakeup();
    }

    public boolean nodeUnavailable(Node node) {
        return this.client.connectionFailed(node) && this.client.connectionDelay(node, this.time.milliseconds()) > 0;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.client.close();
    }

    public long addAll(PollResult pollResult) {
        Objects.requireNonNull(pollResult);
        addAll(pollResult.unsentRequests);
        return pollResult.timeUntilNextPollMs;
    }

    public void addAll(List<UnsentRequest> list) {
        Objects.requireNonNull(list);
        if (list.isEmpty()) {
            return;
        }
        list.forEach(this::add);
    }

    public void add(UnsentRequest unsentRequest) {
        Objects.requireNonNull(unsentRequest);
        unsentRequest.setTimer(this.time, this.requestTimeoutMs);
        unsentRequest.setEnqueueTimeMs(this.time.milliseconds());
        this.unsentRequests.add(unsentRequest);
    }

    public static Supplier<NetworkClientDelegate> supplier(final Time time, final LogContext logContext, final ConsumerMetadata consumerMetadata, final ConsumerConfig consumerConfig, final ApiVersions apiVersions, final Metrics metrics, final Sensor sensor, final ClientTelemetrySender clientTelemetrySender, final BackgroundEventHandler backgroundEventHandler, final boolean z, final AsyncConsumerMetrics asyncConsumerMetrics) {
        return new CachedSupplier<NetworkClientDelegate>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CachedSupplier
            public NetworkClientDelegate create() {
                return new NetworkClientDelegate(time, ConsumerConfig.this, logContext, ClientUtils.createNetworkClient(ConsumerConfig.this, metrics, "consumer", logContext, apiVersions, time, 100, consumerMetadata, sensor, clientTelemetrySender), consumerMetadata, backgroundEventHandler, z, asyncConsumerMetrics);
            }
        };
    }
}
