package kafka.server.link;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache$;
import kafka.server.OffsetTruncationState;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThreadTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidClusterLinkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]g\u0001\u0002\u000b\u0016\u0001qAQ!\t\u0001\u0005\u0002\tBq!\n\u0001C\u0002\u0013%a\u0005\u0003\u00040\u0001\u0001\u0006Ia\n\u0005\ba\u0001\u0011\r\u0011\"\u00032\u0011\u0019A\u0004\u0001)A\u0005e!I\u0011\b\u0001a\u0001\u0002\u0004%IA\u000f\u0005\n}\u0001\u0001\r\u00111A\u0005\n}B\u0011\u0002\u0013\u0001A\u0002\u0003\u0005\u000b\u0015B\u001e\t\u000b%\u0003A\u0011\u000b&\t\u000f\u0005U\u0003\u0001\"\u0011\u0002X!9\u0011q\u000e\u0001\u0005\n\u0005E\u0004bBA=\u0001\u0011\u0005\u0013q\u000b\u0005\b\u0003\u0007\u0003A\u0011AA,\u0011\u001d\t9\t\u0001C\u0001\u0003/Bq!a#\u0001\t\u0013\ti\tC\u0004\u0002\u001e\u0002!\t%a\u0016\t\u000f\u0005=\u0006\u0001\"\u0011\u00022\"I\u00111\u0018\u0001\u0012\u0002\u0013\u0005\u0011Q\u0018\u0005\b\u0003'\u0004A\u0011IA,\u0005q\u0019E.^:uKJd\u0015N\\6GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a+fgRT!AF\f\u0002\t1Lgn\u001b\u0006\u00031e\taa]3sm\u0016\u0014(\"\u0001\u000e\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001!\b\t\u0003=}i\u0011aF\u0005\u0003A]\u0011\u0001DU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G+Z:u\u0003\u0019a\u0014N\\5u}Q\t1\u0005\u0005\u0002%\u00015\tQ#A\u0007dYV\u001cH/\u001a:MS:\\\u0017\nZ\u000b\u0002OA\u0011\u0001&L\u0007\u0002S)\u0011!fK\u0001\u0005kRLGNC\u0001-\u0003\u0011Q\u0017M^1\n\u00059J#\u0001B+V\u0013\u0012\u000bab\u00197vgR,'\u000fT5oW&#\u0007%A\bdYV\u001cH/\u001a:MS:\\g*Y7f+\u0005\u0011\u0004CA\u001a7\u001b\u0005!$BA\u001b,\u0003\u0011a\u0017M\\4\n\u0005]\"$AB*ue&tw-\u0001\tdYV\u001cH/\u001a:MS:\\g*Y7fA\u0005ia-\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012,\u0012a\u000f\t\u0003IqJ!!P\u000b\u00031\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM\u001d+ie\u0016\fG-A\tgKR\u001c\u0007.\u001a:UQJ,\u0017\rZ0%KF$\"\u0001\u0011$\u0011\u0005\u0005#U\"\u0001\"\u000b\u0003\r\u000bQa]2bY\u0006L!!\u0012\"\u0003\tUs\u0017\u000e\u001e\u0005\b\u000f\u001e\t\t\u00111\u0001<\u0003\rAH%M\u0001\u000fM\u0016$8\r[3s)\"\u0014X-\u00193!\u0003i\u0019'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1e)IYeJW0hYF4\u0018\u0011BA\r\u0003G\ti$!\u0013\u0011\u0005ya\u0015BA'\u0018\u0005Q\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\")q*\u0003a\u0001!\u0006!a.Y7f!\t\t\u0006L\u0004\u0002S-B\u00111KQ\u0007\u0002)*\u0011QkG\u0001\u0007yI|w\u000e\u001e \n\u0005]\u0013\u0015A\u0002)sK\u0012,g-\u0003\u000283*\u0011qK\u0011\u0005\u00067&\u0001\r\u0001X\u0001\nM\u0016$8\r[3s\u0013\u0012\u0004\"!Q/\n\u0005y\u0013%aA%oi\")\u0001-\u0003a\u0001C\u0006a1o\\;sG\u0016\u0014%o\\6feB\u0011!-Z\u0007\u0002G*\u0011A-G\u0001\bG2,8\u000f^3s\u0013\t17M\u0001\bCe>\\WM]#oIB{\u0017N\u001c;\t\u000b!L\u0001\u0019A5\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0011\u0005yQ\u0017BA6\u0018\u0005-Y\u0015MZ6b\u0007>tg-[4\t\u000b5L\u0001\u0019\u00018\u0002!\u0019\f\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\bC\u0001\u0010p\u0013\t\u0001xC\u0001\tGC&dW\r\u001a)beRLG/[8og\")!/\u0003a\u0001g\u0006Q!/\u001a9mS\u000e\fWj\u001a:\u0011\u0005y!\u0018BA;\u0018\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJDQa^\u0005A\u0002a\fq!\\3ue&\u001c7\u000fE\u0002z\u0003\u000bi\u0011A\u001f\u0006\u0003onT!\u0001`?\u0002\r\r|W.\\8o\u0015\tQbPC\u0002��\u0003\u0003\ta!\u00199bG\",'BAA\u0002\u0003\ry'oZ\u0005\u0004\u0003\u000fQ(aB'fiJL7m\u001d\u0005\b\u0003\u0017I\u0001\u0019AA\u0007\u0003\u0011!\u0018.\\3\u0011\t\u0005=\u0011QC\u0007\u0003\u0003#Q1!a\u0005|\u0003\u0015)H/\u001b7t\u0013\u0011\t9\"!\u0005\u0003\tQKW.\u001a\u0005\b\u00037I\u0001\u0019AA\u000f\u0003\u0015\tXo\u001c;b!\rq\u0012qD\u0005\u0004\u0003C9\"\u0001\u0004*fa2L7-Y)v_R\f\u0007bBA\u0013\u0013\u0001\u0007\u0011qE\u0001\u0011i&,'o\u0015;bi\u00164U\r^2iKJ\u0004R!QA\u0015\u0003[I1!a\u000bC\u0005\u0019y\u0005\u000f^5p]B!\u0011qFA\u001d\u001b\t\t\tD\u0003\u0003\u00024\u0005U\u0012a\u00024fi\u000eDWM\u001d\u0006\u0004\u0003oI\u0012\u0001\u0002;jKJLA!a\u000f\u00022\t\u0001B+[3s'R\fG/\u001a$fi\u000eDWM\u001d\u0005\n\u0003\u007fI\u0001\u0013!a\u0001\u0003\u0003\n!\u0004\\3bI\u0016\u0014XI\u001c3q_&tGO\u00117pG.LgnZ*f]\u0012\u0004R!QA\u0015\u0003\u0007\u00022AHA#\u0013\r\t9e\u0006\u0002\r\u00052|7m[5oON+g\u000e\u001a\u0005\n\u0003\u0017J\u0001\u0013!a\u0001\u0003\u001b\nQ\u0002\\8h\u0007>tG/\u001a=u\u001fB$\b#B!\u0002*\u0005=\u0003\u0003BA\b\u0003#JA!a\u0015\u0002\u0012\tQAj\\4D_:$X\r\u001f;\u0002\u000f\rdW-\u00198vaR\t\u0001\tK\u0002\u000b\u00037\u0002B!!\u0018\u0002l5\u0011\u0011q\f\u0006\u0005\u0003C\n\u0019'A\u0002ba&TA!!\u001a\u0002h\u00059!.\u001e9ji\u0016\u0014(\u0002BA5\u0003\u0003\tQA[;oSRLA!!\u001c\u0002`\tI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u0012G2,8\u000f^3s\u0019&t7nQ8oM&<WCAA:!\r!\u0013QO\u0005\u0004\u0003o*\"!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u00194\u000f[8vY\u0012,6/\u001a'fC\u0012,'/\u00128e\u001f\u001a47/\u001a;JM&sG/\u001a:Ce>\\WM\u001d,feNLwN\u001c\"fY><(\u0007\r\u0015\u0004\u0019\u0005u\u0004\u0003BA/\u0003\u007fJA!!!\u0002`\t!A+Z:u\u0003u!Xm\u001d;T_V\u00148-Z(gMN,Go\u001d)f]\u0012LgnZ*uCR,\u0007fA\u0007\u0002~\u00051C/Z:u'>,(oY3PM\u001a\u001cX\r^:QK:$\u0017N\\4Ti\u0006$XmV5uQ&\u0013\u0007O\r\u001c)\u00079\ti(A\u0010wKJLg-_*pkJ\u001cWm\u00144gg\u0016$8\u000fU3oI&twm\u0015;bi\u0016$2\u0001QAH\u0011\u001d\t\tj\u0004a\u0001\u0003'\u000b1!\u001b2q!\u0011\t)*!'\u000e\u0005\u0005]%bAA13%!\u00111TAL\u0005)\t\u0005/\u001b,feNLwN\\\u0001!i\u0016\u001cHOR8mY><XM]%t)\"\u0014x\u000e\u001e;mK\u0012|e\u000eT8x\t&\u001c8\u000eK\u0002\u0011\u0003{Bs\u0001EAR\u0003S\u000bY\u000b\u0005\u0003\u0002^\u0005\u0015\u0016\u0002BAT\u0003?\u0012\u0001\u0002R5tC\ndW\rZ\u0001\u0006m\u0006dW/Z\u0011\u0003\u0003[\u000bA\u0004R5tW\u0002\"\bN]8ui2,\u0007%[:!]>$\b%\u00199qY&,G-A\rfqB,7\r^'be.\u0014V\r\u001d7jG\u0006$\u0006N]8ui2,G#\u0002!\u00024\u0006]\u0006BBA[#\u0001\u00071/\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\t\u0011\u0005e\u0016\u0003%AA\u0002q\u000bQ\u0001^5nKN\f1%\u001a=qK\u000e$X*\u0019:l%\u0016\u0004H.[2b)\"\u0014x\u000e\u001e;mK\u0012\"WMZ1vYR$#'\u0006\u0002\u0002@*\u001aA,!1,\u0005\u0005\r\u0007\u0003BAc\u0003\u001fl!!a2\u000b\t\u0005%\u00171Z\u0001\nk:\u001c\u0007.Z2lK\u0012T1!!4C\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003#\f9MA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\f\u0001h\u001d5pk2$gj\u001c;GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195XSRDGK];oG\u0006$Xm\u00148GKR\u001c\u0007\u000eK\u0002\u0014\u0003{\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherThreadTest.class */
public class ClusterLinkFetcherThreadTest extends ReplicaFetcherThreadTest {
    private final UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName = "testCluster";
    private ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;

    public UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId;
    }

    public String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName;
    }

    public ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread = clusterLinkFetcherThread;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public ReplicaFetcherThread createReplicaFetcherThread(final String str, int i, BrokerEndPoint brokerEndPoint, final KafkaConfig kafkaConfig, final FailedPartitions failedPartitions, final ReplicaManager replicaManager, Metrics metrics, Time time, final ReplicaQuota replicaQuota, Option<TierStateFetcher> option, final Option<BlockingSend> option2, Option<LogContext> option3) {
        final ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.mock(ClusterLinkFetcherManager.class);
        EasyMock.expect(clusterLinkFetcherManager.partition((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(None$.MODULE$).anyTimes();
        clusterLinkFetcherManager.updatePartitionFetchState((TopicPartition) EasyMock.anyObject(TopicPartition.class), (FetchState) EasyMock.anyObject(FetchState.class));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{clusterLinkFetcherManager});
        return new ClusterLinkFetcherThread(this, str, kafkaConfig, clusterLinkFetcherManager, failedPartitions, replicaManager, replicaQuota, option2) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$1
            public void clearPartitionLinkFailure(TopicPartition topicPartition, long j) {
            }

            public boolean isReadyForFetch(TopicPartition topicPartition) {
                return true;
            }

            {
                ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig();
                ClusterLinkMetadata clusterLinkMetadata = new ClusterLinkMetadata(kafkaConfig, this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), LinkMode$Destination$.MODULE$, 100L, 60000L);
                BrokerEndPoint brokerEndPoint2 = this.brokerEndPoint();
                Metrics metrics2 = new Metrics();
                SystemTime systemTime = new SystemTime();
                None$ none$ = None$.MODULE$;
                ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) EasyMock.mock(ClusterLinkNetworkClient.class);
                BlockingSend blockingSend = option2.isDefined() ? (BlockingSend) option2.get() : (BlockingSend) EasyMock.mock(BlockingSend.class);
                Option $lessinit$greater$default$16 = ClusterLinkFetcherThread$.MODULE$.$lessinit$greater$default$16();
            }
        };
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @AfterEach
    public void cleanup() {
        if (kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() != null) {
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().shutdown();
        }
        super.cleanup();
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(brokerEndPoint().host()).append(":").append(brokerEndPoint().port()).toString());
        return ClusterLinkConfig$.MODULE$.create(properties);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        ClusterLinkAdminManager clusterLinkAdminManager = new ClusterLinkAdminManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), "clusterId", (KafkaZkClient) null, (ClusterLinkManager) null, new Metrics(), new MockTime());
        NewClusterLink newClusterLink = new NewClusterLink("test-link", "clusterId", (Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().empty()).asJava());
        Assertions.assertThrows(InvalidClusterLinkException.class, () -> {
            clusterLinkAdminManager.createClusterLink(newClusterLink, None$.MODULE$, new ListenerName("EXTERNAL"), false, false, 1000).get();
        });
    }

    @Test
    public void testSourceOffsetsPendingState() {
        verifySourceOffsetsPendingState(ApiVersion$.MODULE$.latestVersion());
    }

    @Test
    public void testSourceOffsetsPendingStateWithIbp26() {
        verifySourceOffsetsPendingState(KAFKA_2_6_IV0$.MODULE$);
    }

    private void verifySourceOffsetsPendingState(ApiVersion apiVersion) {
        MockTime mockTime = new MockTime();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        Partition partition = new Partition(topicPartition, 10000L, ApiVersion$.MODULE$.latestVersion(), 0, mockTime, (IsrChangeListener) EasyMock.createNiceMock(IsrChangeListener.class), (DelayedOperations) EasyMock.createNiceMock(DelayedOperations.class), MetadataCache$.MODULE$.zkMetadataCache(0, MetadataCache$.MODULE$.zkMetadataCache$default$2()), logManager, None$.MODULE$, None$.MODULE$, None$.MODULE$, TestUtils$.MODULE$.createAlterIsrManager());
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        partition.log_$eq(new Some(abstractLog));
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), apiVersion.shortVersion());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(topicPartition)).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.onlinePartition(topicPartition)).andReturn(new Some(partition)).anyTimes();
        BlockingSend blockingSend = (BlockingSend) EasyMock.createNiceMock(BlockingSend.class);
        blockingSend.close();
        EasyMock.expect(BoxedUnit.UNIT).once();
        ClusterLinkDestConnectionManager clusterLinkDestConnectionManager = (ClusterLinkDestConnectionManager) EasyMock.createNiceMock(ClusterLinkDestConnectionManager.class);
        EasyMock.expect(clusterLinkDestConnectionManager.reverseConnectionProvider((NetworkClient) EasyMock.anyObject(), (Option) EasyMock.anyObject(), (String) EasyMock.anyObject())).andReturn(None$.MODULE$).anyTimes();
        EasyMock.replay(new Object[]{replicaManager, logManager, abstractLog, blockingSend});
        Metrics metrics = new Metrics();
        ClusterLinkMetrics clusterLinkMetrics = new ClusterLinkMetrics(kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), LinkMode$Destination$.MODULE$, (ClusterLinkManager) null, None$.MODULE$, metrics, None$.MODULE$);
        clusterLinkMetrics.startup();
        ClusterLinkFetcherThreadTest$$anon$2 clusterLinkFetcherThreadTest$$anon$2 = new ClusterLinkFetcherThreadTest$$anon$2(this, clusterLinkDestConnectionManager, fromProps, replicaManager, clusterLinkMetrics, mockTime, metrics, blockingSend);
        clusterLinkFetcherThreadTest$$anon$2.initializeMetadata();
        TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(2));
        clusterLinkFetcherThreadTest$$anon$2.addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherThreadTest$$anon$2.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion()), false, mockTime.milliseconds());
        clusterLinkFetcherThreadTest$$anon$2.onNewMetadata(clusterLinkFetcherThreadTest$$anon$2.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        if (ApiVersion$.MODULE$.isTruncationOnFetchSupported(apiVersion)) {
            Assertions.assertFalse(offsetsPending$1(partition), new StringBuilder(24).append("State not reset for IBP ").append(apiVersion).toString());
        } else {
            Assertions.assertTrue(offsetsPending$1(partition), "State reset before fetching offsets");
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().empty());
            Assertions.assertTrue(offsetsPending$1(partition), "State reset before source offsets available");
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, false))})));
            Assertions.assertTrue(offsetsPending$1(partition), "State reset before truncation");
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, true))})));
            Assertions.assertFalse(offsetsPending$1(partition), "State not reset after truncation");
        }
        clusterLinkMetrics.shutdown();
        metrics.close();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("Disk throttle is not applied")
    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        super.testFollowerIsThrottledOnLowDisk();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void expectMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        replicaManager.markClusterLinkReplicaThrottle();
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public int expectMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 1);
    }

    private static final boolean offsetsPending$1(Partition partition) {
        return BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets"));
    }
}
