package kafka.server.link;

import kafka.cluster.BrokerEndPoint;
import kafka.server.BrokerBlockingSender;
import kafka.server.ClusterLinkQuotas;
import kafka.server.ClusterLinkReplicaQuotaAdapter;
import kafka.server.FailedPartitions;
import kafka.server.FetcherPool;
import kafka.server.FetcherPool$InSync$;
import kafka.server.KafkaConfig;
import kafka.server.PausedPartitions;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkFetcherThread.scala */
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherThread$.class */
public final class ClusterLinkFetcherThread$ {
    public static final ClusterLinkFetcherThread$ MODULE$ = new ClusterLinkFetcherThread$();
    private static final String mirrorTopicMessageRateMetricName = "mirror-topic-message-rate-per-second";
    private static final String mirrorTopicMessageRateMetricDescription = "Rate of messages being written to the mirror topic per second";
    private static final String mirrorTopicRpoMetricName = "mirror-topic-rpo-seconds";
    private static final String mirrorTopicRpoMetricDescription = "RPO for mirror topic in seconds";

    public Option<LogContext> $lessinit$greater$default$17() {
        return None$.MODULE$;
    }

    public String mirrorTopicMessageRateMetricName() {
        return mirrorTopicMessageRateMetricName;
    }

    public String mirrorTopicMessageRateMetricDescription() {
        return mirrorTopicMessageRateMetricDescription;
    }

    public String mirrorTopicRpoMetricName() {
        return mirrorTopicRpoMetricName;
    }

    public String mirrorTopicRpoMetricDescription() {
        return mirrorTopicRpoMetricDescription;
    }

    public ClusterLinkFetcherThread apply(String str, int i, FetcherPool fetcherPool, KafkaConfig kafkaConfig, ClusterLinkConfig clusterLinkConfig, ClusterLinkMetadata clusterLinkMetadata, ClusterLinkFetcherManager clusterLinkFetcherManager, ClusterLinkConnectionManager clusterLinkConnectionManager, BrokerEndPoint brokerEndPoint, FailedPartitions failedPartitions, PausedPartitions pausedPartitions, ReplicaManager replicaManager, ReplicaQuota replicaQuota, ClusterLinkMetrics clusterLinkMetrics, Time time, Option<String> option, Function0<FetchResponseSize> function0) {
        int brokerId = kafkaConfig.brokerId();
        LogContext logContext = new LogContext(new StringBuilder(80).append("[ClusterLinkFetcher brokerId=").append(brokerId).append(" ").append("fetcherId=").append(i).append(" fetcherPool=").append(fetcherPool.name()).append(" source(link=").append(clusterLinkMetadata.linkName()).append(", leaderId=").append(brokerEndPoint.id()).append(")] ").toString(), clusterLinkConnectionManager.maxLogLevel());
        Map map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("broker-id"), Integer.toString(brokerEndPoint.id())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("fetcher-id"), Integer.toString(i))}));
        if (fetcherPool.equals(FetcherPool$InSync$.MODULE$)) {
            map.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("fetcher-pool"), fetcherPool.name()));
        }
        ClusterLinkNetworkClient clusterLinkNetworkClient = new ClusterLinkNetworkClient(kafkaConfig, clusterLinkConfig, ClusterLinkMetrics$.MODULE$.throttleTimeSensorName(clusterLinkMetadata.linkName()), None$.MODULE$, new Some(new ManualMetadataUpdater()), clusterLinkMetrics.metrics(), ClusterLinkFactory$.MODULE$.linkMetricTags(clusterLinkMetadata.linkName()).$plus$plus(map), time, new StringBuilder(28).append("link-").append(clusterLinkMetadata.linkName()).append("-broker-").append(brokerId).append("-fetcher-").append(i).append("-pool-").append(fetcherPool.name()).toString(), ClusterLinkClientType$Fetcher$.MODULE$, logContext);
        clusterLinkConnectionManager.enableClusterLink(clusterLinkNetworkClient, None$.MODULE$);
        return new ClusterLinkFetcherThread(str, i, ClusterLinkLeaderEndPoint$.MODULE$.apply(logContext, time, new BrokerBlockingSender(brokerEndPoint, kafkaConfig, Predef$.MODULE$.Integer2int(clusterLinkConfig.replicaSocketTimeoutMs()), time, i, clusterLinkNetworkClient.networkClient(), None$.MODULE$), clusterLinkNetworkClient, kafkaConfig, clusterLinkConfig, replicaManager, replicaQuota, clusterLinkMetrics), kafkaConfig, clusterLinkConfig, clusterLinkMetadata, clusterLinkFetcherManager, failedPartitions, pausedPartitions, clusterLinkConfig.replicaFetchBackoff(), replicaManager, replicaQuota, clusterLinkMetrics, time, function0, clusterLinkNetworkClient, new Some(logContext), option);
    }

    public ClusterLinkFetcherWithSharedThread createFetcherWithSharedThread(String str, int i, FetcherPool fetcherPool, Uuid uuid, ClusterLinkSharedFetcherThread clusterLinkSharedFetcherThread, KafkaConfig kafkaConfig, ClusterLinkConfig clusterLinkConfig, ClusterLinkMetadata clusterLinkMetadata, ClusterLinkFetcherManager clusterLinkFetcherManager, ClusterLinkConnectionManager clusterLinkConnectionManager, BrokerEndPoint brokerEndPoint, FailedPartitions failedPartitions, PausedPartitions pausedPartitions, ReplicaManager replicaManager, ClusterLinkQuotas clusterLinkQuotas, ClusterLinkMetrics clusterLinkMetrics, Time time, Option<String> option, Function0<FetchResponseSize> function0) {
        LogContext logContext = new LogContext(new StringBuilder(80).append("[ClusterLinkFetcher brokerId=").append(kafkaConfig.brokerId()).append(" fetcherId=").append(i).append(" fetcherPool=").append(fetcherPool.name()).append(" source(link=").append(clusterLinkMetadata.linkName()).append(", leaderId=").append(brokerEndPoint.id()).append(")] ").toString(), clusterLinkConnectionManager.maxLogLevel());
        ClusterLinkNetworkClient clusterLinkClient = clusterLinkSharedFetcherThread.clusterLinkClient();
        ClusterLinkAsyncSender clusterLinkAsyncSender = new ClusterLinkAsyncSender(brokerEndPoint, kafkaConfig, Predef$.MODULE$.Integer2int(clusterLinkConfig.replicaSocketTimeoutMs()), Predef$.MODULE$.Integer2int(clusterLinkConfig.requestTimeoutMs()), time, clusterLinkQuotas.requestQuotaAdapter(clusterLinkMetadata.linkName(), option), i, fetcherPool, clusterLinkClient.networkClient());
        ClusterLinkReplicaQuotaAdapter clusterLinkReplicaQuotaAdapter = new ClusterLinkReplicaQuotaAdapter(clusterLinkQuotas, option, clusterLinkMetadata.linkName());
        ClusterLinkFetcherWithSharedThread clusterLinkFetcherWithSharedThread = new ClusterLinkFetcherWithSharedThread(str, uuid, i, fetcherPool, str, brokerEndPoint, clusterLinkSharedFetcherThread, clusterLinkAsyncSender, ClusterLinkLeaderEndPoint$.MODULE$.createAsyncLeaderEndpoint(logContext, time, clusterLinkAsyncSender, clusterLinkClient, kafkaConfig, clusterLinkConfig, replicaManager, clusterLinkReplicaQuotaAdapter, clusterLinkMetrics), kafkaConfig, clusterLinkConfig, clusterLinkMetadata, clusterLinkFetcherManager, failedPartitions, pausedPartitions, clusterLinkConfig.replicaFetchBackoff(), replicaManager, clusterLinkReplicaQuotaAdapter, clusterLinkMetrics, time, function0, clusterLinkClient, logContext, option);
        clusterLinkSharedFetcherThread.addFetcher(brokerEndPoint.id(), clusterLinkFetcherWithSharedThread);
        return clusterLinkFetcherWithSharedThread;
    }

    private ClusterLinkFetcherThread$() {
    }
}
