package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.PartitionLinkState;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.ClusterLinkState;
import kafka.cluster.CommittedPartitionState;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.cluster.Partition$;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerFeatures$;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.FetcherPool;
import kafka.server.FetcherPool$Default$;
import kafka.server.FetcherPool$InSync$;
import kafka.server.FetcherTag;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache$;
import kafka.server.OffsetTruncationState;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.PausedPartitions;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThreadTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.metadata.ZkMetadataCache;
import kafka.testkit.TestKitNodes;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.CoreUtils$;
import kafka.utils.Logging;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidClusterLinkException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

/* compiled from: ClusterLinkFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0015Mh\u0001B6m\u0001MDQ\u0001\u001f\u0001\u0005\u0002eDq\u0001 \u0001C\u0002\u0013EQ\u0010C\u0004\u0002\u0014\u0001\u0001\u000b\u0011\u0002@\t\u0013\u0005U\u0001A1A\u0005\u0012\u0005]\u0001\u0002CA\u0015\u0001\u0001\u0006I!!\u0007\t\u0013\u0005-\u0002A1A\u0005\n\u00055\u0002\u0002CA\u001e\u0001\u0001\u0006I!a\f\t\u0017\u0005u\u0002\u00011AA\u0002\u0013E\u0011q\b\u0005\f\u0003\u000f\u0002\u0001\u0019!a\u0001\n#\tI\u0005C\u0006\u0002V\u0001\u0001\r\u0011!Q!\n\u0005\u0005\u0003\"CA,\u0001\t\u0007I\u0011BA\u0017\u0011!\tI\u0006\u0001Q\u0001\n\u0005=\u0002bCA.\u0001\u0001\u0007\t\u0019!C\u0005\u0003;B1\"!\u001a\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002h!Y\u00111\u000e\u0001A\u0002\u0003\u0005\u000b\u0015BA0\u0011%\ti\u0007\u0001b\u0001\n\u0013\ty\u0007\u0003\u0005\u0002\u0002\u0002\u0001\u000b\u0011BA9\u0011%\t\u0019\t\u0001a\u0001\n#\t)\tC\u0005\u0002\u000e\u0002\u0001\r\u0011\"\u0005\u0002\u0010\"A\u00111\u0013\u0001!B\u0013\t9\t\u0003\u0005\u0002\u0016\u0002\u0001\r\u0011\"\u0005~\u0011%\t9\n\u0001a\u0001\n#\tI\nC\u0004\u0002\u001e\u0002\u0001\u000b\u0015\u0002@\t\u0013\u0005}\u0005\u00011A\u0005\u0012\u0005\u0005\u0006\"CAa\u0001\u0001\u0007I\u0011CAb\u0011!\t9\r\u0001Q!\n\u0005\r\u0006\"CAe\u0001\u0001\u0007I\u0011CAf\u0011%\t\u0019\u0010\u0001a\u0001\n#\t)\u0010\u0003\u0005\u0002z\u0002\u0001\u000b\u0015BAg\u0011%\tY\u0010\u0001a\u0001\n#\ti\u0003C\u0005\u0002~\u0002\u0001\r\u0011\"\u0005\u0002��\"A!1\u0001\u0001!B\u0013\ty\u0003C\u0005\u0003\u0006\u0001\u0001\r\u0011\"\u0005\u0003\b!I!Q\u0006\u0001A\u0002\u0013E!q\u0006\u0005\t\u0005g\u0001\u0001\u0015)\u0003\u0003\n!I!Q\b\u0001A\u0002\u0013%\u0011Q\u0011\u0005\n\u0005\u007f\u0001\u0001\u0019!C\u0005\u0005\u0003B\u0001B!\u0012\u0001A\u0003&\u0011q\u0011\u0005\n\u0005\u000f\u0002!\u0019!C\t\u0005\u0013B\u0001B!\u0015\u0001A\u0003%!1\n\u0005\n\u0005'\u0002!\u0019!C\t\u0005+B\u0001B!\u0018\u0001A\u0003%!q\u000b\u0005\n\u0005?\u0002!\u0019!C\t\u0005CB\u0001B!\u001b\u0001A\u0003%!1\r\u0005\n\u0005W\u0002!\u0019!C\t\u0005[B\u0001B!\u001e\u0001A\u0003%!q\u000e\u0005\f\u0005o\u0002\u0001\u0019!a\u0001\n\u0013\u0011I\bC\u0006\u0003\u0002\u0002\u0001\r\u00111A\u0005\n\t\r\u0005b\u0003BD\u0001\u0001\u0007\t\u0011)Q\u0005\u0005wBqA!#\u0001\t#\u0012Y\tC\u0005\u0003T\u0002\t\n\u0011\"\u0005\u0003V\"I!1\u001e\u0001\u0012\u0002\u0013E!Q\u001e\u0005\b\u0005c\u0004A\u0011\u000bBz\u0011\u001d\u0019\t\b\u0001C\t\u0007gBqaa\u001f\u0001\t\u0003\u0019i\bC\u0004\u0004\u0016\u0002!\te! \t\u000f\r}\u0005\u0001\"\u0003\u0004\"\"911\u0017\u0001\u0005B\ru\u0004bBB_\u0001\u0011\u00051Q\u0010\u0005\b\u0007\u0003\u0004A\u0011AB?\u0011\u001d\u0019)\r\u0001C\u0005\u0007\u000fDqaa<\u0001\t\u0003\u0019i\bC\u0004\u0004t\u0002!\ta! \t\u000f\r]\b\u0001\"\u0003\u0004z\"9A1\u0002\u0001\u0005\n\u00115\u0001\"\u0003C!\u0001E\u0005I\u0011\u0002C\"\u0011%!9\u0005AI\u0001\n\u0013!I\u0005C\u0005\u0005N\u0001\t\n\u0011\"\u0003\u0005D!9Aq\n\u0001\u0005\u0002\u0011E\u0003b\u0002C.\u0001\u0011\u00053Q\u0010\u0005\b\t[\u0002A\u0011\tC8\u0011%!9\bAI\u0001\n\u0003!I\bC\u0004\u0005~\u0001!\te! \t\u000f\u0011\u0005\u0005\u0001\"\u0001\u0004~!9AQ\u0011\u0001\u0005\u0002\ru\u0004b\u0002CE\u0001\u0011\u00051Q\u0010\u0005\b\t\u001b\u0003A\u0011AB?\u0011\u001d!\t\n\u0001C\u0005\t'Cq\u0001\"7\u0001\t\u0003!Y\u000eC\u0004\u0005r\u0002!\t\u0005b=\t\u000f\u0015\u0005\u0001\u0001\"\u0001\u0004~!9QQ\u0001\u0001\u0005\u0002\ru\u0004bBC\u0005\u0001\u0011\u00051Q\u0010\u0005\b\u000b\u001b\u0001A\u0011AB?\u0011\u001d)\t\u0002\u0001C\u0005\u000b'Aq!\"\b\u0001\t\u0003\u0019i\bC\u0004\u0006\"\u0001!\ta! \t\u000f\u0015\u0015\u0002\u0001\"\u0003\u0006(!9Q1\u0006\u0001\u0005\u0002\ru\u0004bBC\u0018\u0001\u0011\u00051Q\u0010\u0005\b\u000bg\u0001A\u0011AB?\u0011\u001d)9\u0004\u0001C\u0001\u0007{Bq!b\u000f\u0001\t\u0013)i\u0004C\u0004\u0006b\u0001!\ta! \t\u000f\u0015\u0015\u0004\u0001\"\u0001\u0004~!9Q\u0011\u000e\u0001\u0005\u0002\ru\u0004bBC7\u0001\u0011\u00051Q\u0010\u0005\b\u000bc\u0002A\u0011IB?\u0011\u001d))\b\u0001C\u0005\u000boB\u0011\"\"\"\u0001#\u0003%I\u0001b\u0011\t\u0013\u0015\u001d\u0005!%A\u0005\n\u0011\r\u0003bBCE\u0001\u0011ES1\u0012\u0005\b\u000bc\u0003A\u0011BCZ\u0011%)9\rAI\u0001\n\u0013)I\rC\u0004\u0006N\u0002!I!b4\t\u000f\u0015\u0015\b\u0001\"\u0003\u0006h\na2\t\\;ti\u0016\u0014H*\u001b8l\r\u0016$8\r[3s)\"\u0014X-\u00193UKN$(BA7o\u0003\u0011a\u0017N\\6\u000b\u0005=\u0004\u0018AB:feZ,'OC\u0001r\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001;\u0011\u0005U4X\"\u00018\n\u0005]t'\u0001\u0007*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rV3ti\u00061A(\u001b8jiz\"\u0012A\u001f\t\u0003w\u0002i\u0011\u0001\\\u0001\u000eG2,8\u000f^3s\u0019&t7.\u00133\u0016\u0003y\u00042a`A\b\u001b\t\t\tA\u0003\u0003\u0002\u0004\u0005\u0015\u0011AB2p[6|gNC\u0002r\u0003\u000fQA!!\u0003\u0002\f\u00051\u0011\r]1dQ\u0016T!!!\u0004\u0002\u0007=\u0014x-\u0003\u0003\u0002\u0012\u0005\u0005!\u0001B+vS\u0012\fab\u00197vgR,'\u000fT5oW&#\u0007%A\bdYV\u001cH/\u001a:MS:\\g*Y7f+\t\tI\u0002\u0005\u0003\u0002\u001c\u0005\u0015RBAA\u000f\u0015\u0011\ty\"!\t\u0002\t1\fgn\u001a\u0006\u0003\u0003G\tAA[1wC&!\u0011qEA\u000f\u0005\u0019\u0019FO]5oO\u0006\u00012\r\\;ti\u0016\u0014H*\u001b8l\u001d\u0006lW\rI\u0001\u0015G2,8\u000f^3s\u0019&t7NQ1dW>4g-T:\u0016\u0005\u0005=\u0002\u0003BA\u0019\u0003oi!!a\r\u000b\u0005\u0005U\u0012!B:dC2\f\u0017\u0002BA\u001d\u0003g\u00111!\u00138u\u0003U\u0019G.^:uKJd\u0015N\\6CC\u000e\\wN\u001a4Ng\u0002\n\u0011c\u00197vgR,'\u000fT5oW\u000e{gNZ5h+\t\t\t\u0005E\u0002|\u0003\u0007J1!!\u0012m\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u0016G2,8\u000f^3s\u0019&t7nQ8oM&<w\fJ3r)\u0011\tY%!\u0015\u0011\t\u0005E\u0012QJ\u0005\u0005\u0003\u001f\n\u0019D\u0001\u0003V]&$\b\"CA*\u0013\u0005\u0005\t\u0019AA!\u0003\rAH%M\u0001\u0013G2,8\u000f^3s\u0019&t7nQ8oM&<\u0007%A\u0016qCJ$\u0018\u000e^5p]2\u000bwmZ5oORC'o\u001c;uY\u0016|%/T5he\u0006$XmV1jiRKW.Z't\u00031\u0002\u0018M\u001d;ji&|g\u000eT1hO&tw\r\u00165s_R$H.Z(s\u001b&<'/\u0019;f/\u0006LG\u000fV5nK6\u001b\b%A\u0007gKR\u001c\u0007.\u001a:UQJ,\u0017\rZ\u000b\u0003\u0003?\u00022a_A1\u0013\r\t\u0019\u0007\u001c\u0002\u0019\u00072,8\u000f^3s\u0019&t7NR3uG\",'\u000f\u00165sK\u0006$\u0017!\u00054fi\u000eDWM\u001d+ie\u0016\fGm\u0018\u0013fcR!\u00111JA5\u0011%\t\u0019FDA\u0001\u0002\u0004\ty&\u0001\bgKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0011\u0002#\u0005dGNZ3uG\",'\u000f\u00165sK\u0006$7/\u0006\u0002\u0002rA1\u00111OA?\u0003?j!!!\u001e\u000b\t\u0005]\u0014\u0011P\u0001\b[V$\u0018M\u00197f\u0015\u0011\tY(a\r\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002��\u0005U$A\u0003'jgR\u0014UO\u001a4fe\u0006\u0011\u0012\r\u001c7gKR\u001c\u0007.\u001a:UQJ,\u0017\rZ:!\u0003%I7\u000fR3mCf,G-\u0006\u0002\u0002\bB!\u0011\u0011GAE\u0013\u0011\tY)a\r\u0003\u000f\t{w\u000e\\3b]\u0006i\u0011n\u001d#fY\u0006LX\rZ0%KF$B!a\u0013\u0002\u0012\"I\u00111K\n\u0002\u0002\u0003\u0007\u0011qQ\u0001\u000bSN$U\r\\1zK\u0012\u0004\u0013!D:pkJ\u001cW\rV8qS\u000eLE-A\tt_V\u00148-\u001a+pa&\u001c\u0017\nZ0%KF$B!a\u0013\u0002\u001c\"A\u00111\u000b\f\u0002\u0002\u0003\u0007a0\u0001\bt_V\u00148-\u001a+pa&\u001c\u0017\n\u001a\u0011\u0002/1,\u0017\rZ3s\u000b:$\u0007k\\5oi\u0016C8-\u001a9uS>tWCAAR!\u0019\t\t$!*\u0002*&!\u0011qUA\u001a\u0005\u0019y\u0005\u000f^5p]B!\u00111VA^\u001d\u0011\ti+a.\u000f\t\u0005=\u0016QW\u0007\u0003\u0003cS1!a-s\u0003\u0019a$o\\8u}%\u0011\u0011QG\u0005\u0005\u0003s\u000b\u0019$A\u0004qC\u000e\\\u0017mZ3\n\t\u0005u\u0016q\u0018\u0002\n\u000bb\u001cW\r\u001d;j_:TA!!/\u00024\u0005YB.Z1eKJ,e\u000e\u001a)pS:$X\t_2faRLwN\\0%KF$B!a\u0013\u0002F\"I\u00111K\r\u0002\u0002\u0003\u0007\u00111U\u0001\u0019Y\u0016\fG-\u001a:F]\u0012\u0004v.\u001b8u\u000bb\u001cW\r\u001d;j_:\u0004\u0013aD3q_\u000eDWI\u001c3PM\u001a\u001cX\r^:\u0016\u0005\u00055\u0007\u0003CAh\u0003+\fI.a8\u000e\u0005\u0005E'\u0002BAj\u0003s\n\u0011\"[7nkR\f'\r\\3\n\t\u0005]\u0017\u0011\u001b\u0002\u0004\u001b\u0006\u0004\bcA@\u0002\\&!\u0011Q\\A\u0001\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\u0004B!!9\u0002n:!\u00111]Au\u001b\t\t)O\u0003\u0003\u0002h\u0006\u0005\u0011aB7fgN\fw-Z\u0005\u0005\u0003W\f)/\u0001\u0011PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3ECR\f\u0017\u0002BAx\u0003c\u0014a\"\u00129pG\",e\u000eZ(gMN,GO\u0003\u0003\u0002l\u0006\u0015\u0018aE3q_\u000eDWI\u001c3PM\u001a\u001cX\r^:`I\u0015\fH\u0003BA&\u0003oD\u0011\"a\u0015\u001d\u0003\u0003\u0005\r!!4\u0002!\u0015\u0004xn\u00195F]\u0012|eMZ:fiN\u0004\u0013!E7j]&s7/\u001f8d%\u0016\u0004H.[2bg\u0006)R.\u001b8J]NLhn\u0019*fa2L7-Y:`I\u0015\fH\u0003BA&\u0005\u0003A\u0011\"a\u0015 \u0003\u0003\u0005\r!a\f\u0002%5Lg.\u00138ts:\u001c'+\u001a9mS\u000e\f7\u000fI\u0001\u000eY\u0006$Xm\u001d;PM\u001a\u001cX\r^:\u0016\u0005\t%\u0001\u0003CAh\u0003+\fINa\u0003\u0011\t\t5!q\u0005\b\u0005\u0005\u001f\u0011\u0019C\u0004\u0003\u0003\u0012\t\u0005b\u0002\u0002B\n\u0005?qAA!\u0006\u0003\u001e9!!q\u0003B\u000e\u001d\u0011\tyK!\u0007\n\u0005\u00055\u0011\u0002BA\u0005\u0003\u0017I1!]A\u0004\u0013\u0011\t\u0019!!\u0002\n\t\u0005\u001d\u0018\u0011A\u0005\u0005\u0005K\t)/A\fMSN$xJ\u001a4tKR\u001c(+Z:q_:\u001cX\rR1uC&!!\u0011\u0006B\u0016\u0005qa\u0015n\u001d;PM\u001a\u001cX\r^:QCJ$\u0018\u000e^5p]J+7\u000f]8og\u0016TAA!\n\u0002f\u0006\tB.\u0019;fgR|eMZ:fiN|F%Z9\u0015\t\u0005-#\u0011\u0007\u0005\n\u0003'\u0012\u0013\u0011!a\u0001\u0005\u0013\ta\u0002\\1uKN$xJ\u001a4tKR\u001c\b\u0005K\u0002$\u0005o\u0001B!!\r\u0003:%!!1HA\u001a\u0005!1x\u000e\\1uS2,\u0017!\u0004:fC\u0012Lhi\u001c:GKR\u001c\u0007.A\tsK\u0006$\u0017PR8s\r\u0016$8\r[0%KF$B!a\u0013\u0003D!I\u00111K\u0013\u0002\u0002\u0003\u0007\u0011qQ\u0001\u000fe\u0016\fG-\u001f$pe\u001a+Go\u00195!\u0003E1W\r^2i%\u0016\u001c\bo\u001c8tKNK'0Z\u000b\u0003\u0005\u0017\u00022a\u001fB'\u0013\r\u0011y\u0005\u001c\u0002\u0012\r\u0016$8\r\u001b*fgB|gn]3TSj,\u0017A\u00054fi\u000eD'+Z:q_:\u001cXmU5{K\u0002\n!c\u00197vgR,'\u000fT5oW6+GO]5dgV\u0011!q\u000b\t\u0004w\ne\u0013b\u0001B.Y\n\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003M\u0019G.^:uKJd\u0015N\\6NKR\u0014\u0018nY:!\u0003]\u0019X\r\\3di>\u0014X*\u001a;sS\u000e\u001c(+Z4jgR\u0014\u00180\u0006\u0002\u0003dA\u00191P!\u001a\n\u0007\t\u001dDN\u0001\u0012DYV\u001cH/\u001a:MS:\\7+\u001a7fGR|'/T3ue&\u001c7OU3hSN$(/_\u0001\u0019g\u0016dWm\u0019;pe6+GO]5dgJ+w-[:uef\u0004\u0013aC2p]:l\u0015M\\1hKJ,\"Aa\u001c\u0011\u0007m\u0014\t(C\u0002\u0003t1\u0014Ae\u00117vgR,'\u000fT5oW>+HOY8v]\u0012\u001cuN\u001c8fGRLwN\\'b]\u0006<WM]\u0001\rG>tg.T1oC\u001e,'\u000fI\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s+\t\u0011Y\bE\u0002v\u0005{J1Aa o\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJ\f!C]3qY&\u001c\u0017-T1oC\u001e,'o\u0018\u0013fcR!\u00111\nBC\u0011%\t\u0019\u0006MA\u0001\u0002\u0004\u0011Y(A\bsKBd\u0017nY1NC:\fw-\u001a:!\u0003i\u0019'/Z1uKJ+Wn\u001c;f\u0019\u0016\fG-\u001a:F]\u0012\u0004v.\u001b8u)9\u0011iIa%\u0003\u001e\n\u0005&1\u0016B[\u0005\u000f\u00042!\u001eBH\u0013\r\u0011\tJ\u001c\u0002\u0015%\u0016lw\u000e^3MK\u0006$WM]#oIB{\u0017N\u001c;\t\u000f\tU%\u00071\u0001\u0003\u0018\u0006a!M]8lKJ\u001cuN\u001c4jOB\u0019QO!'\n\u0007\tmeNA\u0006LC\u001a\\\u0017mQ8oM&<\u0007b\u0002BPe\u0001\u0007!1P\u0001\u000be\u0016\u0004H.[2b\u001b\u001e\u0014\bb\u0002BRe\u0001\u0007!QU\u0001\u0006cV|G/\u0019\t\u0004k\n\u001d\u0016b\u0001BU]\na!+\u001a9mS\u000e\f\u0017+^8uC\"9!Q\u0016\u001aA\u0002\t=\u0016A\u00077fC\u0012,'/\u00128ea>Lg\u000e\u001e\"m_\u000e\\\u0017N\\4TK:$\u0007cA;\u00032&\u0019!1\u00178\u0003\u0019\tcwnY6j]\u001e\u001cVM\u001c3\t\u0013\t]&\u0007%AA\u0002\te\u0016!\u00047pO\u000e{g\u000e^3yi>\u0003H\u000f\u0005\u0004\u00022\u0005\u0015&1\u0018\t\u0005\u0005{\u0013\u0019-\u0004\u0002\u0003@*!!\u0011YA\u0001\u0003\u0015)H/\u001b7t\u0013\u0011\u0011)Ma0\u0003\u00151{wmQ8oi\u0016DH\u000fC\u0005\u0003JJ\u0002\n\u00111\u0001\u0003L\u00069A/[7f\u001fB$\bCBA\u0019\u0003K\u0013i\r\u0005\u0003\u0003>\n=\u0017\u0002\u0002Bi\u0005\u007f\u0013A\u0001V5nK\u0006!3M]3bi\u0016\u0014V-\\8uK2+\u0017\rZ3s\u000b:$\u0007k\\5oi\u0012\"WMZ1vYR$S'\u0006\u0002\u0003X*\"!\u0011\u0018BmW\t\u0011Y\u000e\u0005\u0003\u0003^\n\u001dXB\u0001Bp\u0015\u0011\u0011\tOa9\u0002\u0013Ut7\r[3dW\u0016$'\u0002\u0002Bs\u0003g\t!\"\u00198o_R\fG/[8o\u0013\u0011\u0011IOa8\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-\u0001\u0013de\u0016\fG/\u001a*f[>$X\rT3bI\u0016\u0014XI\u001c3Q_&tG\u000f\n3fM\u0006,H\u000e\u001e\u00137+\t\u0011yO\u000b\u0003\u0003L\ne\u0017AG2sK\u0006$XMU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$GC\bB{\u0005w\u001cia!\u0005\u0004\"\r\r2QFB\u001c\u0007\u0003\u001a\u0019e!\u0015\u0004V\r]3\u0011LB8!\r)(q_\u0005\u0004\u0005st'\u0001\u0006*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0004\u0003~V\u0002\rAa@\u0002\t9\fW.\u001a\t\u0005\u0007\u0003\u0019IA\u0004\u0003\u0004\u0004\r\u0015\u0001\u0003BAX\u0003gIAaa\u0002\u00024\u00051\u0001K]3eK\u001aLA!a\n\u0004\f)!1qAA\u001a\u0011\u001d\u0019y!\u000ea\u0001\u0003_\t\u0011BZ3uG\",'/\u00133\t\u000f\rMQ\u00071\u0001\u0004\u0016\u0005a1o\\;sG\u0016\u0014%o\\6feB!1qCB\u000f\u001b\t\u0019IBC\u0002\u0004\u001cA\fqa\u00197vgR,'/\u0003\u0003\u0004 \re!A\u0004\"s_.,'/\u00128e!>Lg\u000e\u001e\u0005\b\u0005++\u0004\u0019\u0001BL\u0011\u001d\u0019)#\u000ea\u0001\u0007O\t\u0001CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:\u0011\u0007U\u001cI#C\u0002\u0004,9\u0014\u0001CR1jY\u0016$\u0007+\u0019:uSRLwN\\:\t\u000f\r=R\u00071\u0001\u00042\u0005\u0001\u0002/Y;tK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\t\u0004k\u000eM\u0012bAB\u001b]\n\u0001\u0002+Y;tK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u0005\b\u0007s)\u0004\u0019AB\u001e\u0003I)\u0007\u0010]8oK:$\u0018.\u00197CC\u000e\\wN\u001a4\u0011\t\tu6QH\u0005\u0005\u0007\u007f\u0011yL\u0001\nFqB|g.\u001a8uS\u0006d')Y2l_\u001a4\u0007b\u0002BPk\u0001\u0007!1\u0010\u0005\b\u0007\u000b*\u0004\u0019AB$\u0003\u001diW\r\u001e:jGN\u0004Ba!\u0013\u0004N5\u001111\n\u0006\u0005\u0007\u000b\n\t!\u0003\u0003\u0004P\r-#aB'fiJL7m\u001d\u0005\b\u0007'*\u0004\u0019\u0001Bg\u0003\u0011!\u0018.\\3\t\u000f\t\rV\u00071\u0001\u0003&\"9!QV\u001bA\u0002\t=\u0006bBB.k\u0001\u00071QL\u0001\u0011i&,'o\u0015;bi\u00164U\r^2iKJ\u0004b!!\r\u0002&\u000e}\u0003\u0003BB1\u0007Wj!aa\u0019\u000b\t\r\u00154qM\u0001\bM\u0016$8\r[3s\u0015\r\u0019I\u0007]\u0001\u0005i&,'/\u0003\u0003\u0004n\r\r$\u0001\u0005+jKJ\u001cF/\u0019;f\r\u0016$8\r[3s\u0011%\u00119,\u000eI\u0001\u0002\u0004\u0011I,\u0001\u000bde\u0016\fG/\u001a$fi\u000eDWM]'b]\u0006<WM\u001d\u000b\u0003\u0007k\u00022a_B<\u0013\r\u0019I\b\u001c\u0002\u001a\u00072,8\u000f^3s\u0019&t7NR3uG\",'/T1oC\u001e,'/A\u0003tKR,\b\u000f\u0006\u0002\u0002L!\u001aqg!!\u0011\t\r\r5\u0011S\u0007\u0003\u0007\u000bSAaa\"\u0004\n\u0006\u0019\u0011\r]5\u000b\t\r-5QR\u0001\bUV\u0004\u0018\u000e^3s\u0015\u0011\u0019y)a\u0003\u0002\u000b),h.\u001b;\n\t\rM5Q\u0011\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017aB2mK\u0006tW\u000f\u001d\u0015\u0004q\re\u0005\u0003BBB\u00077KAa!(\u0004\u0006\nI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u0018GJ,\u0017\r^3DYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e$B!!\u0011\u0004$\"91QU\u001dA\u0002\r\u001d\u0016!D8wKJ\u0014\u0018\u000eZ3Qe>\u00048\u000f\u0005\u0003\u0004*\u000e=VBABV\u0015\u0011\u0019i+!\t\u0002\tU$\u0018\u000e\\\u0005\u0005\u0007c\u001bYK\u0001\u0006Qe>\u0004XM\u001d;jKN\f1g\u001d5pk2$Wk]3MK\u0006$WM]#oI>3gm]3u\u0013\u001aLe\u000e^3s\u0005J|7.\u001a:WKJ\u001c\u0018n\u001c8CK2|wO\r\u0019)\u0007i\u001a9\f\u0005\u0003\u0004\u0004\u000ee\u0016\u0002BB^\u0007\u000b\u0013A\u0001V3ti\u0006iB/Z:u!\u0016tG-\u001b8h\u0019><WI\u001c3PM\u001a\u001cX\r^+qI\u0006$X\rK\u0002<\u0007o\u000b\u0001\u0005^3ti\u001a+Go\u00195SKF,Xm\u001d;QCJ$\u0018\u000e^5p]6\u000b\u0007pU5{K\"\u001aAha.\u0002\u0013\u0019,Go\u00195ECR\fG\u0003BBe\u0007C\u0004\u0002b!+\u0004L\u0006e7QZ\u0005\u0005\u0003/\u001cY\u000b\u0005\u0003\u0004P\u000emg\u0002BBi\u0007/l!aa5\u000b\t\rU\u0017\u0011A\u0001\te\u0016\fX/Z:ug&!1\u0011\\Bj\u000311U\r^2i%\u0016\fX/Z:u\u0013\u0011\u0019ina8\u0003\u001bA\u000b'\u000f^5uS>tG)\u0019;b\u0015\u0011\u0019Ina5\t\u000f\r\rX\b1\u0001\u0004f\u0006a\u0001/\u0019:uSRLwN\\'baBA1\u0011ABt\u00033\u001cI/\u0003\u0003\u0002X\u000e-\u0001cA;\u0004l&\u00191Q\u001e8\u0003'A\u000b'\u000f^5uS>tg)\u001a;dQN#\u0018\r^3\u0002;Q,7\u000f^*pkJ\u001cWm\u00144gg\u0016$8\u000fU3oI&twm\u0015;bi\u0016D3APB\\\u0003\u0019\"Xm\u001d;T_V\u00148-Z(gMN,Go\u001d)f]\u0012LgnZ*uCR,w+\u001b;i\u0013\n\u0004(G\u000e\u0015\u0004\u007f\r]\u0016a\b<fe&4\u0017pU8ve\u000e,wJ\u001a4tKR\u001c\b+\u001a8eS:<7\u000b^1uKR!\u00111JB~\u0011\u001d\u0019i\u0010\u0011a\u0001\u0007\u007f\f1!\u001b2q!\u0011!\t\u0001b\u0002\u000e\u0005\u0011\r!\u0002BA\u0002\t\u000bQ1a\\A\u0003\u0013\u0011!I\u0001b\u0001\u0003\u001f5+G/\u00193bi\u00064VM]:j_:\f\u0001e]3ukB4U\r^2iKJl\u0015M\\1hKJ\fe\u000e\u001a)beRLG/[8ogRqAq\u0002C\u0011\t[!\t\u0004b\r\u00058\u0011u\u0002\u0003CA\u0019\t#\u0019)\b\"\u0006\n\t\u0011M\u00111\u0007\u0002\u0007)V\u0004H.\u001a\u001a\u0011\r\u0005MDq\u0003C\u000e\u0013\u0011!I\"!\u001e\u0003\u0007M+G\u000f\u0005\u0003\u0004\u0018\u0011u\u0011\u0002\u0002C\u0010\u00073\u0011\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\rM\u0013\t1\u0001\u0005$A!AQ\u0005C\u0015\u001b\t!9C\u0003\u0003\u0004.\u0012\u0015\u0011\u0002\u0002C\u0016\tO\u0011\u0001\"T8dWRKW.\u001a\u0005\b\t_\t\u0005\u0019AA\u0018\u00035qW/\u001c)beRLG/[8og\"91Q`!A\u0002\r}\b\"\u0003C\u001b\u0003B\u0005\t\u0019AAD\u00039)8/\u001a#v[6LH\u000b\u001b:fC\u0012D\u0011\u0002\"\u000fB!\u0003\u0005\r\u0001b\u000f\u0002)\t\u0014xn[3s\u0007>tg-[4Pm\u0016\u0014(/\u001b3f!!\u0019\taa:\u0003��\n}\b\"\u0003C \u0003B\u0005\t\u0019AAD\u0003A\u0019Gn\\;e)>\u001cEn\\;e\u0019&t7.\u0001\u0016tKR,\bOR3uG\",'/T1oC\u001e,'/\u00118e!\u0006\u0014H/\u001b;j_:\u001cH\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u0011\u0015#\u0006BAD\u00053\f!f]3ukB4U\r^2iKJl\u0015M\\1hKJ\fe\u000e\u001a)beRLG/[8og\u0012\"WMZ1vYR$S'\u0006\u0002\u0005L)\"A1\bBm\u0003)\u001aX\r^;q\r\u0016$8\r[3s\u001b\u0006t\u0017mZ3s\u0003:$\u0007+\u0019:uSRLwN\\:%I\u00164\u0017-\u001e7uIY\n!c]3ukB4U\r^2iKJ$\u0006N]3bIRA\u00111\nC*\t+\"I\u0006C\u0004\u0004T\u0015\u0003\r\u0001b\t\t\u000f\u0011]S\t1\u0001\u0004v\u0005qa-\u001a;dQ\u0016\u0014X*\u00198bO\u0016\u0014\bb\u0002C\u0018\u000b\u0002\u0007\u0011qF\u0001!i\u0016\u001cHOR8mY><XM]%t)\"\u0014x\u000e\u001e;mK\u0012|e\u000eT8x\t&\u001c8\u000eK\u0004G\t?\")\u0007b\u001a\u0011\t\r\rE\u0011M\u0005\u0005\tG\u001a)I\u0001\u0005ESN\f'\r\\3e\u0003\u00151\u0018\r\\;fC\t!I'\u0001\u000fESN\\\u0007\u0005\u001e5s_R$H.\u001a\u0011jg\u0002rw\u000e\u001e\u0011baBd\u0017.\u001a3)\u0007\u0019\u001b9,A\rwKJLg-_'be.\u0014V\r\u001d7jG\u0006$\u0006N]8ui2,GCBA&\tc\"\u0019\bC\u0004\u0003x\u001d\u0003\rAa\u001f\t\u0013\u0011Ut\t%AA\u0002\u0005=\u0012!\u0002;j[\u0016\u001c\u0018a\t<fe&4\u00170T1sWJ+\u0007\u000f\\5dCRC'o\u001c;uY\u0016$C-\u001a4bk2$HEM\u000b\u0003\twRC!a\f\u0003Z\u0006A4\u000f[8vY\u0012tu\u000e\u001e$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[,ji\"$&/\u001e8dCR,wJ\u001c$fi\u000eD\u0007fA%\u00048\u0006AB/Z:u\r\u0016$8\r[3s)\"\u0014X-\u00193CC\u000e\\wN\u001a4)\u0007)\u001b9,A\u000euKN$\u0018\t\u001a6vgRd\u0015mZ4j]\u001e\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004\u0017\u000e]\u0016\u0001\f;fgRl\u0015n\u001a:bi\u0016\u0004\u0016M\u001d;ji&|gn\u001d+p\u0013:\u001c\u0016P\\2B]\u0012$UMZ1vYR\u0004vn\u001c7tQ\ra5qW\u0001\u0015i\u0016\u001cH/Q;u_R+h.\u001a$fi\u000eDWM]:)\u00075\u001b9,\u0001\nwKJLg-_!vi>$VO\\3J]\u001a|GCDA&\t+#9\nb'\u0005&\u0012=F1\u0017\u0005\b\t/r\u0005\u0019AB;\u0011\u001d!IJ\u0014a\u0001\u0003_\t1B\\;n\r\u0016$8\r[3sg\"9AQ\u0014(A\u0002\u0011}\u0015a\u00057j].\u001cU/\\;mCRLg/\u001a\"zi\u0016\u001c\b\u0003BA\u0019\tCKA\u0001b)\u00024\t!Aj\u001c8h\u0011\u001d!9K\u0014a\u0001\tS\u000bA\u0002\\5oW\nKH/\u001a*bi\u0016\u0004B!!\r\u0005,&!AQVA\u001a\u0005\u0019!u.\u001e2mK\"9A\u0011\u0017(A\u0002\u0011}\u0015!\u00028po6\u001b\bb\u0002C[\u001d\u0002\u0007AqW\u0001\u000bC\u0012TWo\u001d;nK:$\b\u0003\u0002C]\t'tA\u0001b/\u0005N:!AQ\u0018Ce\u001d\u0011!y\fb2\u000f\t\u0011\u0005GQ\u0019\b\u0005\u0003_#\u0019-C\u0001r\u0013\ty\u0007/\u0003\u0002n]&\u0019A1\u001a7\u00021\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM\u001d+ie\u0016\fG-\u0003\u0003\u0005P\u0012E\u0017AD!eUV\u001cH/\\3oiRK\b/\u001a\u0006\u0004\t\u0017d\u0017\u0002\u0002Ck\t/\u0014a\"\u00113kkN$X.\u001a8u)f\u0004XM\u0003\u0003\u0005P\u0012E\u0017!\b<fe&4\u0017PR3uG\",'\u000f\u00165sK\u0006$\u0007+\u0019:uSRLwN\\:\u0015\u0011\u0005-CQ\u001cCs\tSDq!a\u0017P\u0001\u0004!y\u000eE\u0002|\tCL1\u0001b9m\u0005I\u0019E.^:uKJd\u0015N\\6GKR\u001c\u0007.\u001a:\t\u000f\u0011\u001dx\n1\u0001\u0003��\u0006)Ao\u001c9jG\"9A1^(A\u0002\u00115\u0018\u0001\u00059beRLG/[8o\u001dVl'-\u001a:t!\u0019\u0019\t\u0001b<\u00020%!A\u0011DB\u0006\u00039\"Xm\u001d;M_\u000e\fGNR3uG\"\u001cu.\u001c9mKRLwN\\%g\u0011&<\u0007nV1uKJl\u0017M]6Va\u0012\fG/\u001a3\u0015\t\u0005-CQ\u001f\u0005\b\to\u0004\u0006\u0019AAD\u0003QA\u0017n\u001a5XCR,'/\\1sWV\u0003H-\u0019;fI\":\u0001\u000bb\u0018\u0005f\u0011m\u0018E\u0001C\u007f\u0003q\u001aG.^:uKJ\u0004C.\u001b8lA\u0019,Go\u00195fe\u0002\u001aw.\u001c9mKR,7\u000f\t4fi\u000eDWm\u001d\u0011jM\u0002JG\u000f\t:fa2L7-\u0019;fI\u0002\"\u0017\r^1)\u0007A\u001b9,\u0001\ruKN$(\u000b]8NKR\u0014\u0018nY\"bY\u000e,H.\u0019;j_:D3!UB\\\u0003}\"Xm\u001d;XC&$\u0018N\\4QCJ$\u0018\u000e^5p]N<\u0016\u000e\u001e5EKN$X\t]8dQ\n+\u0007.\u001b8e/\",gnU8ve\u000e,'+Z2pe\u0012\u001c\u0018I\u001d:jm\u0016D3AUB\\\u0003\u0005#Xm\u001d;XC&$\u0018N\\4QCJ$\u0018\u000e^5p]N<\u0016\u000e\u001e5EKN$X\t]8dQ\n+X\u000e]3e\u0005\u00164wN]3T_V\u00148-\u001a*fG>\u0014Hm]!se&4X\rK\u0002T\u0007o\u000b\u0011\u0006^3ti^\u000b\u0017\u000e^5oOB\u000b'\u000f^5uS>t7oV5uQN+7m\u001c8eCJLh)Y5mkJ,\u0007f\u0001+\u00048\u0006Yc/\u001a:jMf<\u0016-\u001b;j]\u001e\u0004\u0016M\u001d;ji&|gn\u001d#vKR{gj\\*pkJ\u001cWMU3d_J$7\u000f\u0006\u0004\u0002L\u0015UQ\u0011\u0004\u0005\b\u000b/)\u0006\u0019AAD\u0003\u0019\"Wm\u001d;Fa>\u001c\u0007NQ3iS:$w\u000b[3o'>,(oY3SK\u000e|'\u000fZ:BeJLg/\u001a\u0005\b\u000b7)\u0006\u0019AAD\u0003MA\u0017m]*fG>tG-\u0019:z\r\u0006LG.\u001e:f\u00035\"Xm\u001d;O_RLg-\u001f*fC\u0012Lhi\u001c:GKR\u001c\u0007nV5uQ6+H\u000e^5qY\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004-\u000e]\u0016A\u000b;fgRtu\u000e^5gsJ+\u0017\rZ=G_J4U\r^2i/&$\bnU5oO2,\u0007+\u0019:uSRLwN\u001c\u0015\u0004/\u000e]\u0016!\u0007<fe&4\u0017PT8uS\u001aL(+Z1es\u001a{'OR3uG\"$B!a\u0013\u0006*!9Aq\u0006-A\u0002\u0005=\u0012A\t;fgR4U\r^2i%\u0016\u001c\bo\u001c8tK^KG\u000f\u001b)beRLG/[8o\t\u0006$\u0018\rK\u0002Z\u0007o\u000bA\u0005^3ti\u001a+Go\u00195SKN\u0004xN\\:f/&$\bNT8QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u0015\u00045\u000e]\u0016A\t;fgR4U\r^2i%\u0016\u001c\bo\u001c8tK^KG\u000f\u001b(p\u001d\u0016<X*Z:tC\u001e,7\u000fK\u0002\\\u0007o\u000b1\u0004^3tiV\u001bX-\u00138eKB,g\u000eZ3oiJ+G/\u001a8uS>t\u0007f\u0001/\u00048\u0006Yb/\u001a:jMf4U\r^2i%\u0016\u001c\bo\u001c8tK\"\u000bg\u000e\u001a7j]\u001e$\u0002\"a\u0013\u0006@\u0015eSQ\f\u0005\b\u000b\u0003j\u0006\u0019AC\"\u0003)\t\u0007\u000f]3oI&sgm\u001c\t\u0007\u0003c\t)+\"\u0012\u0011\t\u0015\u001dSQK\u0007\u0003\u000b\u0013RA!b\u0013\u0006N\u0005\u0019An\\4\u000b\t\u0015=S\u0011K\u0001\nS:$XM\u001d8bYNTA!b\u0015\u0002\u0006\u000591\u000f^8sC\u001e,\u0017\u0002BC,\u000b\u0013\u0012Q\u0002T8h\u0003B\u0004XM\u001c3J]\u001a|\u0007bBC.;\u0002\u0007\u0011qQ\u0001\u0018g\"|W\u000f\u001c3D_6\u0004H.\u001a;f!V\u0014x-\u0019;pefDq!b\u0018^\u0001\u0004\t9)A\fvg\u0016Le\u000eZ3qK:$WM\u001c;SKR,g\u000e^5p]\u0006AC/Z:u-\u0006d\u0017\u000eZ1uK6K'O]8s)J,hnY1uS>tw+\u001b;i)>\u0004\u0018nY%eg\"\u001aala.\u0002WQ,7\u000f\u001e,bY&$\u0017\r^3NSJ\u0014xN\u001d+sk:\u001c\u0017\r^5p]^KG\u000f[8viR{\u0007/[2JIND3aXB\\\u0003E\"Xm\u001d;ESN\fG\u000e\\8x\u001b&\u0014(o\u001c:UeVt7-\u0019;j_:\u0014U\r\\8x\u0011^kei\u001c:Is\n\u0014\u0018\u000e\u001a'j].D3\u0001YB\\\u0003A\"Xm\u001d;ESN\fG\u000e\\8x\u001b&\u0014(o\u001c:UeVt7-\u0019;j_:\u0014U\r\\8x\u0011^kei\u001c:DY>,H\rT5oW\"\u001a\u0011ma.\u0002?Q,7\u000f\u001e+sk:\u001c\u0017\r^5p]2+7o\u001d+iC:Du+T'fiJL7\rK\u0002c\u0007o\u000baD^3sS\u001aLh+\u00197jI\u0006$X-T5se>\u0014HK];oG\u0006$\u0018n\u001c8\u0015\u0011\u0005-S\u0011PC?\u000b\u0003Cq!b\u001fd\u0001\u0004\t9)A\tiCN\u001cv.\u001e:dKR{\u0007/[2JIND\u0011\"b d!\u0003\u0005\r!a\"\u0002/\u0005dGn\\<UeVt7-\u0019;j_:\u0014U\r\\8x\u0011^k\u0005\"CCBGB\u0005\t\u0019AAD\u0003II7o\u00117pk\u0012$vn\u00117pk\u0012d\u0015N\\6\u0002QY,'/\u001b4z-\u0006d\u0017\u000eZ1uK6K'O]8s)J,hnY1uS>tG\u0005Z3gCVdG\u000f\n\u001a\u0002QY,'/\u001b4z-\u0006d\u0017\u000eZ1uK6K'O]8s)J,hnY1uS>tG\u0005Z3gCVdG\u000fJ\u001a\u00025Y,'/\u001b4z\u001f\u001a47/\u001a;SKF,Xm\u001d;WKJ\u001c\u0018n\u001c8\u0015\u0011\u0005-SQRCH\u000bCCqa!@g\u0001\u0004\u0019y\u0010C\u0004\u0006\u0012\u001a\u0004\r!b%\u00027=4gm]3u\r>\u0014H*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:u!\u0011))*b'\u000f\t\rEWqS\u0005\u0005\u000b3\u001b\u0019.\u0001\u000fPM\u001a\u001cX\r^:G_JdU-\u00193fe\u0016\u0003xn\u00195SKF,Xm\u001d;\n\t\u0015uUq\u0014\u0002\b\u0005VLG\u000eZ3s\u0015\u0011)Ija5\t\u000f\u0015\rf\r1\u0001\u0006&\u0006\u0011B.[:u\u001f\u001a47/\u001a;t%\u0016\fX/Z:u!\u0011)9+\",\u000f\t\rEW\u0011V\u0005\u0005\u000bW\u001b\u0019.\u0001\nMSN$xJ\u001a4tKR\u001c(+Z9vKN$\u0018\u0002BCO\u000b_SA!b+\u0004T\u00061b-\u001a;dQJ+7\u000f]8og\u0016\fE\u000e\\8dCR|'\u000f\u0006\u0004\u00066\u0016mVQ\u0018\t\u0004w\u0016]\u0016bAC]Y\n\t3\t\\;ti\u0016\u0014H*\u001b8l\r\u0016$8\r\u001b*fgB|gn]3BY2|7-\u0019;pe\"9!QS4A\u0002\t]\u0005\"CC`OB\u0005\t\u0019ACa\u0003-1W\r^2iKJ\u0004vn\u001c7\u0011\u0007U,\u0019-C\u0002\u0006F:\u00141BR3uG\",'\u000fU8pY\u0006\u0001c-\u001a;dQJ+7\u000f]8og\u0016\fE\u000e\\8dCR|'\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t)YM\u000b\u0003\u0006B\ne\u0017aE1mY2Kgn[3e!\u0006\u0014H/\u001b;j_:\u001cH\u0003BCi\u000bG\u0004\u0002\"b5\u0006Z\u0006eWQ\\\u0007\u0003\u000b+TA!b6\u0004,\u0006Q1m\u001c8dkJ\u0014XM\u001c;\n\t\u0015mWQ\u001b\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bcA>\u0006`&\u0019Q\u0011\u001d7\u0003#A\u000b'\u000f^5uS>t\u0017I\u001c3Ti\u0006$X\rC\u0004\u0005X%\u0004\ra!\u001e\u0002%\u0011,wM]1eK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0005\u000bS,\t\u0010\u0005\u0005\u0006T\u0016e\u0017\u0011\\Cv!\rYXQ^\u0005\u0004\u000b_d'!E'jeJ|'OR1jYV\u0014X\rV=qK\"9Aq\u000b6A\u0002\rU\u0004")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherThreadTest.class */
public class ClusterLinkFetcherThreadTest extends ReplicaFetcherThreadTest {
    private ClusterLinkConfig clusterLinkConfig;
    private ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    private ReplicaManager kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager;
    private final Uuid clusterLinkId = Uuid.randomUuid();
    private final String clusterLinkName = "testCluster";
    private final int clusterLinkBackoffMs = 100;
    private final int partitionLaggingThrottleOrMigrateWaitTimeMs = TestKitNodes.CONTROLLER_ID_OFFSET;
    private final ListBuffer<ClusterLinkFetcherThread> kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads = (ListBuffer) ListBuffer$.MODULE$.apply(Nil$.MODULE$);
    private boolean isDelayed = false;
    private Uuid sourceTopicId = Uuid.randomUuid();
    private Option<Exception> leaderEndPointException = None$.MODULE$;
    private Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> epochEndOffsets = Predef$.MODULE$.Map().empty();
    private int minInsyncReplicas = 2;
    private volatile Map<TopicPartition, ListOffsetsResponseData.ListOffsetsPartitionResponse> latestOffsets = Predef$.MODULE$.Map().empty();
    private boolean kafka$server$link$ClusterLinkFetcherThreadTest$$readyForFetch = true;
    private final FetchResponseSize fetchResponseSize = new FetchResponseSize(500, 1000);
    private final ClusterLinkMetrics clusterLinkMetrics = new ClusterLinkMetrics(clusterLinkName(), clusterLinkId(), ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, ConnectionMode$Inbound$.MODULE$, false, (ClusterLinkManager) null, None$.MODULE$, new Metrics(), None$.MODULE$, false);
    private final ClusterLinkSelectorMetricsRegistry selectorMetricsRegistry = (ClusterLinkSelectorMetricsRegistry) Mockito.mock(ClusterLinkSelectorMetricsRegistry.class);
    private final ClusterLinkOutboundConnectionManager connManager = (ClusterLinkOutboundConnectionManager) Mockito.mock(ClusterLinkOutboundConnectionManager.class);

    public Uuid clusterLinkId() {
        return this.clusterLinkId;
    }

    public String clusterLinkName() {
        return this.clusterLinkName;
    }

    private int clusterLinkBackoffMs() {
        return this.clusterLinkBackoffMs;
    }

    public ClusterLinkConfig clusterLinkConfig() {
        return this.clusterLinkConfig;
    }

    public void clusterLinkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.clusterLinkConfig = clusterLinkConfig;
    }

    private int partitionLaggingThrottleOrMigrateWaitTimeMs() {
        return this.partitionLaggingThrottleOrMigrateWaitTimeMs;
    }

    public ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread = clusterLinkFetcherThread;
    }

    public ListBuffer<ClusterLinkFetcherThread> kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads;
    }

    public boolean isDelayed() {
        return this.isDelayed;
    }

    public void isDelayed_$eq(boolean z) {
        this.isDelayed = z;
    }

    public Uuid sourceTopicId() {
        return this.sourceTopicId;
    }

    public void sourceTopicId_$eq(Uuid uuid) {
        this.sourceTopicId = uuid;
    }

    public Option<Exception> leaderEndPointException() {
        return this.leaderEndPointException;
    }

    public void leaderEndPointException_$eq(Option<Exception> option) {
        this.leaderEndPointException = option;
    }

    public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> epochEndOffsets() {
        return this.epochEndOffsets;
    }

    public void epochEndOffsets_$eq(Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> map) {
        this.epochEndOffsets = map;
    }

    public int minInsyncReplicas() {
        return this.minInsyncReplicas;
    }

    public void minInsyncReplicas_$eq(int i) {
        this.minInsyncReplicas = i;
    }

    public Map<TopicPartition, ListOffsetsResponseData.ListOffsetsPartitionResponse> latestOffsets() {
        return this.latestOffsets;
    }

    public void latestOffsets_$eq(Map<TopicPartition, ListOffsetsResponseData.ListOffsetsPartitionResponse> map) {
        this.latestOffsets = map;
    }

    public boolean kafka$server$link$ClusterLinkFetcherThreadTest$$readyForFetch() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$readyForFetch;
    }

    private void readyForFetch_$eq(boolean z) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$readyForFetch = z;
    }

    public FetchResponseSize fetchResponseSize() {
        return this.fetchResponseSize;
    }

    public ClusterLinkMetrics clusterLinkMetrics() {
        return this.clusterLinkMetrics;
    }

    public ClusterLinkSelectorMetricsRegistry selectorMetricsRegistry() {
        return this.selectorMetricsRegistry;
    }

    public ClusterLinkOutboundConnectionManager connManager() {
        return this.connManager;
    }

    public ReplicaManager kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager;
    }

    private void replicaManager_$eq(ReplicaManager replicaManager) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager = replicaManager;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    /* renamed from: createRemoteLeaderEndPoint */
    public RemoteLeaderEndPoint mo283createRemoteLeaderEndPoint(final KafkaConfig kafkaConfig, final ReplicaManager replicaManager, final ReplicaQuota replicaQuota, final BlockingSend blockingSend, Option<LogContext> option, Option<Time> option2) {
        final LogContext logContext = (LogContext) option.getOrElse(() -> {
            return new LogContext();
        });
        final FetchSessionHandler fetchSessionHandler = new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id());
        final ClusterLinkLeaderRequestBuilder clusterLinkLeaderRequestBuilder = new ClusterLinkLeaderRequestBuilder(clusterLinkConfig(), (Time) option2.getOrElse(() -> {
            return Time.SYSTEM;
        }));
        final ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) Mockito.mock(ClusterLinkNetworkClient.class);
        final ClusterLinkFollowerFetchThrottler clusterLinkFollowerFetchThrottler = new ClusterLinkFollowerFetchThrottler();
        return new ClusterLinkLeaderEndPoint(this, logContext, blockingSend, clusterLinkNetworkClient, fetchSessionHandler, clusterLinkLeaderRequestBuilder, clusterLinkFollowerFetchThrottler, kafkaConfig, replicaManager, replicaQuota) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$1
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public boolean isReadyForFetch(TopicPartition topicPartition) {
                return this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$readyForFetch();
            }

            public MetadataResponseData.MetadataResponseTopic fetchTopicMetadata(String str) {
                this.$outer.leaderEndPointException().foreach(exc -> {
                    throw exc;
                });
                return new MetadataResponseData.MetadataResponseTopic().setTopicId(this.$outer.sourceTopicId());
            }

            public scala.collection.Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> fetchEpochEndOffsets(scala.collection.Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> map) {
                this.$outer.leaderEndPointException().foreach(exc -> {
                    throw exc;
                });
                return this.$outer.epochEndOffsets().isEmpty() ? super/*kafka.server.RemoteLeaderEndPoint*/.fetchEpochEndOffsets(map) : this.$outer.epochEndOffsets();
            }

            public scala.collection.Map<TopicPartition, ListOffsetsResponseData.ListOffsetsPartitionResponse> fetchLatestOffsets(Set<TopicPartition> set) {
                this.$outer.leaderEndPointException().foreach(exc -> {
                    throw exc;
                });
                return this.$outer.latestOffsets().isEmpty() ? super.fetchLatestOffsets(set) : this.$outer.latestOffsets();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(logContext.logPrefix(), blockingSend, clusterLinkNetworkClient, fetchSessionHandler, clusterLinkLeaderRequestBuilder, clusterLinkFollowerFetchThrottler, kafkaConfig, this.clusterLinkConfig(), replicaManager, replicaQuota, this.clusterLinkMetrics());
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public Option<LogContext> createRemoteLeaderEndPoint$default$5() {
        return None$.MODULE$;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public Option<Time> createRemoteLeaderEndPoint$default$6() {
        return None$.MODULE$;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public ReplicaFetcherThread createReplicaFetcherThread(final String str, int i, BrokerEndPoint brokerEndPoint, final KafkaConfig kafkaConfig, final FailedPartitions failedPartitions, final PausedPartitions pausedPartitions, final ExponentialBackoff exponentialBackoff, final ReplicaManager replicaManager, Metrics metrics, final Time time, final ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<TierStateFetcher> option, Option<LogContext> option2) {
        final ClusterLinkFetcherManager createFetcherManager = createFetcherManager();
        final RemoteLeaderEndPoint mo283createRemoteLeaderEndPoint = mo283createRemoteLeaderEndPoint(kafkaConfig, replicaManager, replicaQuota, blockingSend, option2, new Some(time));
        return new ClusterLinkFetcherThread(this, str, mo283createRemoteLeaderEndPoint, kafkaConfig, createFetcherManager, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, replicaQuota, time) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$2
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public void clearPartitionLinkFailure(TopicPartition topicPartition, long j, boolean z) {
            }

            public void delayPartitions(Iterable<TopicPartition> iterable) {
                super/*kafka.server.AbstractFetcherThread*/.delayPartitions(iterable);
                this.$outer.isDelayed_$eq(true);
            }

            public boolean validateMirrorTruncation(TopicPartition topicPartition, OffsetTruncationState offsetTruncationState) {
                return true;
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                FetcherPool$Default$ fetcherPool$Default$ = FetcherPool$Default$.MODULE$;
                ClusterLinkLeaderEndPoint clusterLinkLeaderEndPoint = (ClusterLinkLeaderEndPoint) mo283createRemoteLeaderEndPoint;
                ClusterLinkConfig clusterLinkConfig = this.clusterLinkConfig();
                ClusterLinkMetadata clusterLinkMetadata = new ClusterLinkMetadata(kafkaConfig, this.clusterLinkName(), this.clusterLinkId(), ClusterLinkConfig.LinkMode.DESTINATION, 100L, 60000L);
                ClusterLinkMetrics clusterLinkMetrics = this.clusterLinkMetrics();
                ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(kafkaConfig, FetcherPool$Default$.MODULE$);
                ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) Mockito.mock(ClusterLinkNetworkClient.class);
                None$ none$ = None$.MODULE$;
                None$ none$2 = None$.MODULE$;
            }
        };
    }

    public ClusterLinkFetcherManager createFetcherManager() {
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) Mockito.mock(ClusterLinkFetcherManager.class);
        Mockito.when(clusterLinkFetcherManager.partition((TopicPartition) ArgumentMatchers.any(TopicPartition.class))).thenReturn(None$.MODULE$);
        ((ClusterLinkFetcherManager) Mockito.doNothing().when(clusterLinkFetcherManager)).updatePartitionFetchState((TopicPartition) ArgumentMatchers.any(TopicPartition.class), ArgumentMatchers.anyLong(), (Option) ArgumentMatchers.any());
        Mockito.when(BoxesRunTime.boxToLong(clusterLinkFetcherManager.getAndClearPendingLogEndOffsetUpdate((TopicPartition) ArgumentMatchers.any(TopicPartition.class), ArgumentMatchers.anyLong()))).thenAnswer(invocationOnMock -> {
            return invocationOnMock.getArgument(1);
        });
        Mockito.when(BoxesRunTime.boxToBoolean(clusterLinkFetcherManager.onPartitionLinkFailure((TopicPartition) ArgumentMatchers.any(TopicPartition.class), (MirrorFailureType) ArgumentMatchers.any(MirrorFailureType.class), ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        return clusterLinkFetcherManager;
    }

    @BeforeEach
    public void setup() {
        clusterLinkConfig_$eq(createClusterLinkConfig(new Properties()));
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @AfterEach
    public void cleanup() {
        kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
            clusterLinkFetcherThread.shutdown();
            return BoxedUnit.UNIT;
        });
        clusterLinkMetrics().shutdown();
        isDelayed_$eq(false);
        super.cleanup();
    }

    private ClusterLinkConfig createClusterLinkConfig(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", new StringBuilder(1).append(brokerEndPoint().host()).append(":").append(brokerEndPoint().port()).toString());
        properties2.put("replica.fetch.backoff.ms", Integer.toString(clusterLinkBackoffMs()));
        properties2.put(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "2");
        properties2.put(ClusterLinkConfig$.MODULE$.LinkFetcherLaggingPartitionMsProp(), Integer.toString(partitionLaggingThrottleOrMigrateWaitTimeMs()));
        properties2.putAll(properties);
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        return clusterLinkConfig$.create(properties2, none$, true);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Metrics metrics = new Metrics();
        ClusterLinkAdminManager clusterLinkAdminManager = new ClusterLinkAdminManager(fromProps, "clusterId", ClusterLinkTestUtils$.MODULE$.createClusterLinkManager(MetadataVersion.IBP_0_11_0_IV0), metrics, new MockTime(), ConfluentConfigs.buildMultitenantMetadata(fromProps.values(), metrics) != null);
        try {
            NewClusterLink newClusterLink = new NewClusterLink("test-link", "clusterId", Collections.emptyMap());
            Assertions.assertThrows(InvalidClusterLinkException.class, () -> {
                clusterLinkAdminManager.createClusterLink(newClusterLink, None$.MODULE$, new ListenerName("EXTERNAL"), false, false, 1000, 1).get();
            });
        } finally {
            clusterLinkAdminManager.shutdown();
        }
    }

    @Test
    public void testPendingLogEndOffsetUpdate() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.leaderLogIfLocal()).thenReturn(new Some(abstractLog));
        Mockito.when(partition.topicPartition()).thenReturn(topicPartition);
        Mockito.when(kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().getPartitionOrException(topicPartition)).thenReturn(partition);
        clusterLinkFetcherManager.addLinkedFetcherForPartitions(new $colon.colon(partition, Nil$.MODULE$));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(100L));
        clusterLinkFetcherManager.setPendingLogEndOffset(topicPartition, 150L);
        FetchResponseData.PartitionData partitionIndex = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords((byte) 2, 150L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(200L).setLogStartOffset(150L).setPartitionIndex(0);
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(1L));
        Mockito.when(partition.appendRecordsToFollower(ArgumentMatchers.eq(100L), (AppendOrigin) ArgumentMatchers.eq(AppendOrigin.CLUSTER_LINK), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any())).thenReturn(new Some(logAppendInfo));
        Mockito.when(kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.eq(topicPartition), ArgumentMatchers.eq(100L), (AppendOrigin) ArgumentMatchers.eq(AppendOrigin.CLUSTER_LINK), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenCallRealMethod();
        Assertions.assertSame(logAppendInfo, kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().processPartitionData(topicPartition, 150L, partitionIndex).get());
    }

    @Test
    public void testFetchRequestPartitionMaxSize() {
        Tuple2 $minus$greater$extension;
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        scala.collection.mutable.Set set = (scala.collection.mutable.Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        TopicPartition topicPartition = ((Partition) set.head()).topicPartition();
        Map$ Map = Predef$.MODULE$.Map();
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(topicPartition);
        Some some = new Some(Uuid.randomUuid());
        None$ none$ = None$.MODULE$;
        None$ none$2 = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$3 = None$.MODULE$;
        None$ none$4 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Map map = (Map) Map.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 150L, none$, 1, none$2, fetching$, none$3, none$4, 0))}));
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().acquireFetchBuffer();
        Map map2 = map.map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition2 = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition2, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, this.fetchResponseSize().perPartitionSize(), Optional.empty(), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            java.util.Map fetchData = fetchData(map);
            if ($anonfun$testFetchRequestPartitionMaxSize$3(map2, fetchData)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(fetchData), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(fetchData), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map2).asJava(), (java.util.Map) tuple23._1());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public java.util.Map<TopicPartition, FetchRequest.PartitionData> fetchData(Map<TopicPartition, PartitionFetchState> map) {
        AbstractFetcherThread.ResultWithPartitions buildFetch = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().leader().buildFetch(map);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Some some = (Option) buildFetch.result();
        if (some instanceof Some) {
            return ((AbstractFetcherThread.ReplicaFetch) some.value()).fetchRequest().fetchData();
        }
        if (None$.MODULE$.equals(some)) {
            return Collections.emptyMap();
        }
        throw new MatchError(some);
    }

    @Test
    public void testSourceOffsetsPendingState() {
        verifySourceOffsetsPendingState(MetadataVersion.latestTesting());
    }

    @Test
    public void testSourceOffsetsPendingStateWithIbp26() {
        verifySourceOffsetsPendingState(MetadataVersion.IBP_2_6_IV0);
    }

    private void verifySourceOffsetsPendingState(MetadataVersion metadataVersion) {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, metadataVersion, false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        scala.collection.mutable.Set set = (scala.collection.mutable.Set) tuple2._2();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Partition partition = (Partition) set.head();
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true, Collections.emptyMap()), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        if (metadataVersion.isTruncationOnFetchSupported()) {
            Assertions.assertFalse(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), new StringBuilder(24).append("State not reset for IBP ").append(metadataVersion).toString());
            return;
        }
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before fetching offsets");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().empty());
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before source offsets available");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, false, None$.MODULE$))})));
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before truncation");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, true, None$.MODULE$))})));
        Assertions.assertFalse(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State not reset after truncation");
    }

    private Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> setupFetcherManagerAndPartitions(MockTime mockTime, int i, MetadataVersion metadataVersion, boolean z, Map<String, String> map, boolean z2) {
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AlterPartitionListener alterPartitionListener = (AlterPartitionListener) Mockito.mock(AlterPartitionListener.class);
        LogConfig logConfig = (LogConfig) Mockito.mock(LogConfig.class);
        Mockito.when(BoxesRunTime.boxToInteger(logConfig.minInSyncReplicas())).thenReturn(BoxesRunTime.boxToInteger(1));
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(abstractLog.config()).thenReturn(logConfig);
        replicaManager_$eq((ReplicaManager) Mockito.mock(ReplicaManager.class));
        Mockito.when(kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().brokerTopicStats()).thenReturn(new BrokerTopicStats());
        scala.collection.mutable.Set set = (scala.collection.mutable.Set) Set$.MODULE$.apply(Nil$.MODULE$);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), i - 1).foreach(obj -> {
            return $anonfun$setupFetcherManagerAndPartitions$1(this, mockTime, alterPartitionListener, logManager, abstractLog, set, BoxesRunTime.unboxToInt(obj));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty("inter.broker.protocol.version", metadataVersion.versionWithSuffix());
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", Integer.toString(fetchResponseSize().responseSize()));
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.min.bytes", Integer.toString(fetchResponseSize().perPartitionSize()));
        createBrokerConfig.setProperty("confluent.cluster.link.insync.fetch.response.total.bytes", Integer.toString(fetchResponseSize().responseSize()));
        createBrokerConfig.setProperty("confluent.cluster.link.insync.fetch.response.min.bytes", Integer.toString(fetchResponseSize().perPartitionSize()));
        map.foreach(tuple2 -> {
            return createBrokerConfig.setProperty((String) tuple2._1(), (String) tuple2._2());
        });
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        ((BlockingSend) Mockito.doNothing().when(newMockBlockingSend)).close();
        ClusterLinkFetcherThreadTest$$anon$3 clusterLinkFetcherThreadTest$$anon$3 = new ClusterLinkFetcherThreadTest$$anon$3(this, (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class), fromProps, (ClusterLinkScheduler) Mockito.mock(ClusterLinkScheduler.class), mockTime, z2, newMockBlockingSend, z, i);
        clusterLinkFetcherThreadTest$$anon$3.initializeMetadata();
        clusterLinkFetcherThreadTest$$anon$3.addLinkedFetcherForPartitions(set);
        return new Tuple2<>(clusterLinkFetcherThreadTest$$anon$3, set);
    }

    private boolean setupFetcherManagerAndPartitions$default$4() {
        return false;
    }

    private Map<String, String> setupFetcherManagerAndPartitions$default$5() {
        return Predef$.MODULE$.Map().empty();
    }

    private boolean setupFetcherManagerAndPartitions$default$6() {
        return false;
    }

    public void setupFetcherThread(MockTime mockTime, ClusterLinkFetcherManager clusterLinkFetcherManager, int i) {
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(i)), topicPartition -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true, Collections.emptyMap()), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("Disk throttle is not applied")
    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        super.testFollowerIsThrottledOnLowDisk();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(i))).markClusterLinkReplicaThrottle();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting(), 1);
    }

    @Test
    public void testFetcherThreadBackoff() {
        super.shouldPollIndefinitelyIfLeaderReturnsAnyException();
        Assertions.assertTrue(isDelayed());
    }

    @Test
    public void testAdjustLaggingPartitions() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 4, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        scala.collection.mutable.Set set = (scala.collection.mutable.Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 4);
        String str = "topic";
        Metrics metrics = clusterLinkMetrics().metrics();
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            TopicPartition topicPartition = new TopicPartition(str, i);
            Map map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("link-name"), this.clusterLinkName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), str)}));
            String sb = new StringBuilder(2).append(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName()).append("-").append(this.clusterLinkName()).append("-").append(str).toString();
            MetricName metricName = new MetricName(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName(), "cluster-link-metrics", ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricDescription(), CollectionConverters$.MODULE$.MapHasAsJava(map).asJava());
            Sensor sensor = this.clusterLinkMetrics().metrics().sensor(sb, metrics.config(), TimeUnit.MINUTES.toSeconds(5L), new Sensor[0]);
            Rate rate = new Rate();
            if (sensor == null) {
                throw null;
            }
            sensor.add(metricName, rate, (MetricConfig) null);
            sensor.record(i, mockTime.milliseconds());
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, i + 100);
        });
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() / 2));
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(0, clusterLinkFetcherManager.throttledPartitionCount());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(2, clusterLinkFetcherManager.throttledPartitionCount());
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 2));
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 3));
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp(i2 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i2);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, 0L);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, mockTime.milliseconds());
        });
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(0, clusterLinkFetcherManager.throttledPartitionCount());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp(i3 -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(new TopicPartition("topic", i3), mockTime.milliseconds());
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(2), 3).foreach$mVc$sp(i4 -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(new TopicPartition("topic", i4), i4 + 100);
        });
        Assertions.assertEquals(new Tuple2(ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment(), None$.MODULE$), kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().adjustLaggingPartitionsRequired(mockTime.milliseconds()));
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i5 -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(new TopicPartition("topic", i5), i5 + 100);
        });
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(2, clusterLinkFetcherManager.throttledPartitionCount());
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 2));
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 3));
        clusterLinkFetcherManager.addLinkedFetcherForPartitions(set);
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(0, clusterLinkFetcherManager.throttledPartitionCount());
    }

    @Test
    public void testMigratePartitionsToInSyncAndDefaultPools() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 4, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 4);
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertEquals(FetcherPool$Default$.MODULE$, ((FetcherTag) ((Tuple2) clusterLinkFetcherManager.fetcherThreadMap().head())._1()).fetcherPool());
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp(i -> {
            TopicPartition topicPartition = new TopicPartition("topic", i);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, mockTime.milliseconds());
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, 0L);
        });
        clusterLinkFetcherManager.maybeMigrateInSyncPartitions(clusterLinkFetcherManager.maybeMigrateInSyncPartitions$default$1());
        FetcherTag fetcherTag = new FetcherTag(0, 0, FetcherPool$Default$.MODULE$);
        FetcherTag fetcherTag2 = new FetcherTag(0, 0, FetcherPool$InSync$.MODULE$);
        Assertions.assertEquals(2, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag));
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag2));
        ObjectRef create = ObjectRef.create((ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag, () -> {
            return null;
        }));
        verifyFetcherThreadPartitions((ClusterLinkFetcher) create.elem, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{2, 3})));
        ClusterLinkFetcher clusterLinkFetcher = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag2, () -> {
            return null;
        });
        verifyFetcherThreadPartitions(clusterLinkFetcher, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})));
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        clusterLinkFetcher.updatePartitionLastCaughtUpTime(topicPartition, mockTime.milliseconds());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        clusterLinkFetcher.updateFetcherLagStats(topicPartition, 1L);
        clusterLinkFetcherManager.maybeMigrateLaggingPartitions();
        Assertions.assertEquals(2, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag));
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag2));
        create.elem = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag, () -> {
            return null;
        });
        verifyFetcherThreadPartitions((ClusterLinkFetcher) create.elem, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3})));
        ClusterLinkFetcher clusterLinkFetcher2 = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag2, () -> {
            return null;
        });
        verifyFetcherThreadPartitions(clusterLinkFetcher2, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})));
        TopicPartition topicPartition2 = new TopicPartition("topic", 0);
        clusterLinkFetcher2.updatePartitionLastCaughtUpTime(topicPartition2, mockTime.milliseconds());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() / 2));
        clusterLinkFetcher2.updateFetcherLagStats(topicPartition2, 1L);
        clusterLinkFetcherManager.maybeMigrateLaggingPartitions();
        Assertions.assertEquals(2, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag));
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag2));
        create.elem = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag, () -> {
            return null;
        });
        verifyFetcherThreadPartitions((ClusterLinkFetcher) create.elem, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3})));
        ClusterLinkFetcher clusterLinkFetcher3 = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag2, () -> {
            return null;
        });
        verifyFetcherThreadPartitions(clusterLinkFetcher3, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})));
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + (partitionLaggingThrottleOrMigrateWaitTimeMs() * 2));
        clusterLinkFetcher3.updateFetcherLagStats(topicPartition2, 1L);
        clusterLinkFetcherManager.maybeMigrateLaggingPartitions();
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag));
        create.elem = (ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag, () -> {
            return null;
        });
        verifyFetcherThreadPartitions((ClusterLinkFetcher) create.elem, "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1, 2, 3})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i2 -> {
            TopicPartition topicPartition3 = new TopicPartition("topic", i2);
            ((ClusterLinkFetcher) create.elem).updatePartitionLastCaughtUpTime(topicPartition3, mockTime.milliseconds());
            ((ClusterLinkFetcher) create.elem).updateFetcherLagStats(topicPartition3, 0L);
        });
        clusterLinkFetcherManager.maybeMigrateInSyncPartitions(clusterLinkFetcherManager.maybeMigrateInSyncPartitions$default$1());
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertTrue(clusterLinkFetcherManager.fetcherThreadMap().contains(fetcherTag2));
        verifyFetcherThreadPartitions((ClusterLinkFetcher) clusterLinkFetcherManager.fetcherThreadMap().getOrElse(fetcherTag2, () -> {
            return null;
        }), "topic", (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1, 2, 3})));
    }

    @Test
    public void testAutoTuneFetchers() {
        LongRef create = LongRef.create(0L);
        long j = 1 * 1000;
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig$.MODULE$.LinkFetcherAutoTuneEnableProp(), "true");
        properties.put(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp(), "3");
        properties.put(ClusterLinkConfig$.MODULE$.LinkFetcherAutoTuneDecreaseThroughputThresholdBytesProp(), Integer.toString(1000));
        clusterLinkConfig_$eq(createClusterLinkConfig(properties));
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 4, MetadataVersion.latestTesting(), true, (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.max.client.connections"), Integer.toString(100))})), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 4);
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertEquals(FetcherPool$Default$.MODULE$, ((FetcherTag) ((Tuple2) clusterLinkFetcherManager.fetcherThreadMap().head())._1()).fetcherPool());
        Assertions.assertEquals((Object) null, clusterLinkFetcherManager.autoTuneInfo());
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, 0L, 0.0d, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, 0L, 0.0d, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(new TopicPartition("topic", 0), 1L);
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, 0L, 0.0d, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        clusterLinkFetcherManager.markFetchRequestThrottled();
        long j2 = 1000;
        int i = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i2 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i2);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j2);
            create.elem += j2;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j3 = 0 + create.elem;
        double d = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, j3, d, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        Metrics metrics = clusterLinkMetrics().metrics();
        IntRef create2 = IntRef.create(100 - 1);
        metrics.addMetric(ClusterLinkMetrics$.MODULE$.activeClientConnectionsCountMetricName(), (metricConfig, j4) -> {
            return create2.elem;
        });
        long j5 = 1000;
        int i3 = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i22 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i22);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j5);
            create.elem += j5;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i3, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j6 = j3 + create.elem;
        double d2 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, j6, d2, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        create2.elem = 100 / 2;
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        long j7 = 1000;
        int i4 = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j7);
            create.elem += j7;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i4, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j8 = j6 + create.elem;
        double d3 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, j8, d3, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.Increase());
        Assertions.assertEquals(2, clusterLinkFetcherManager.fetcherThreadMap().size());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        long j9 = 1005;
        int i5 = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i2222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i2222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j9);
            create.elem += j9;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i5, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j10 = j8 + create.elem;
        long j11 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        Assertions.assertEquals(2, clusterLinkFetcherManager.fetcherThreadMap().size());
        verifyAutoTuneInfo(clusterLinkFetcherManager, 1, j10, d3, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.Increase());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        long j12 = 1200;
        int i6 = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i22222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i22222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j12);
            create.elem += j12;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i6, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j13 = j10 + create.elem;
        double d4 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        Assertions.assertEquals(3, clusterLinkFetcherManager.fetcherThreadMap().size());
        verifyAutoTuneInfo(clusterLinkFetcherManager, 2, j13, d4, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.Increase());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        long j14 = 1500;
        int i7 = 1;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i222222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i222222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j14);
            create.elem += j14;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i7, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j15 = j13 + create.elem;
        double d5 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        Assertions.assertEquals(3, clusterLinkFetcherManager.fetcherThreadMap().size());
        verifyAutoTuneInfo(clusterLinkFetcherManager, 3, j15, d5, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment());
        mockTime.setCurrentTimeMs(mockTime.milliseconds() + j);
        long j16 = (1000 / 4) - 1;
        int i8 = 0;
        create.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 4 - 1).foreach$mVc$sp(i2222222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i2222222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j16);
            create.elem += j16;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i8, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        long j17 = j15 + create.elem;
        double d6 = create.elem / 1;
        clusterLinkFetcherManager.maybeAutoTuneFetchers();
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        verifyAutoTuneInfo(clusterLinkFetcherManager, 3, j17, d6, mockTime.milliseconds(), ClusterLinkFetcherThread$AdjustmentType$.MODULE$.Decrease());
    }

    private void verifyAutoTuneInfo(ClusterLinkFetcherManager clusterLinkFetcherManager, int i, long j, double d, long j2, Enumeration.Value value) {
        Assertions.assertTrue(clusterLinkFetcherManager.autoTuneInfo() != null);
        Assertions.assertEquals(i, clusterLinkFetcherManager.autoTuneInfo().numFetchers());
        Assertions.assertEquals(j, clusterLinkFetcherManager.autoTuneInfo().linkCumulativeBytes());
        Assertions.assertEquals(d, clusterLinkFetcherManager.autoTuneInfo().linkByteRate());
        Assertions.assertEquals(j2, clusterLinkFetcherManager.autoTuneInfo().nowMs());
        Assertions.assertEquals(value, clusterLinkFetcherManager.autoTuneInfo().adjustment());
    }

    public void verifyFetcherThreadPartitions(ClusterLinkFetcher clusterLinkFetcher, String str, scala.collection.immutable.Set<Object> set) {
        Set partitions = clusterLinkFetcher.partitions();
        Assertions.assertEquals(set.size(), partitions.size(), new StringBuilder(50).append("expected partition numbers ").append(set).append(", got topic partitions ").append(partitions).toString());
        set.foreach(i -> {
            Assertions.assertTrue(partitions.contains(new TopicPartition(str, i)));
        });
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("cluster link fetcher completes fetches if it replicated data")
    @Test
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        super.testLocalFetchCompletionIfHighWatermarkUpdated(z);
    }

    @Test
    public void testRpoMetricCalculation() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        setupFetcherThread(mockTime, (ClusterLinkFetcherManager) tuple2._1(), 1);
        Metrics metrics = clusterLinkMetrics().metrics();
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50).isEmpty());
        Map map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("link-name"), clusterLinkName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), "test-topic")}));
        String sb = new StringBuilder(2).append(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName()).append("-").append(clusterLinkName()).append("-").append("test-topic").toString();
        MetricName metricName = new MetricName(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName(), "cluster-link-metrics", ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricDescription(), CollectionConverters$.MODULE$.MapHasAsJava(map).asJava());
        Sensor sensor = clusterLinkMetrics().metrics().sensor(sb, metrics.config(), TimeUnit.MINUTES.toSeconds(5L), new Sensor[0]);
        Rate rate = new Rate();
        if (sensor == null) {
            throw null;
        }
        sensor.add(metricName, rate, (MetricConfig) null);
        long currentTimeMillis = System.currentTimeMillis();
        sensor.record(0.0d, System.currentTimeMillis());
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50).isEmpty());
        sensor.record(100.0d, System.currentTimeMillis());
        Option calculateMirrorTopicRpo = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 0L);
        Assertions.assertTrue(calculateMirrorTopicRpo.isDefined());
        Assertions.assertEquals(0.0d, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo.get()));
        Option calculateMirrorTopicRpo2 = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50);
        double currentTimeMillis2 = 50 / (100.0d / (((System.currentTimeMillis() - currentTimeMillis) + 30000) / 1000.0d));
        Assertions.assertTrue(calculateMirrorTopicRpo2.isDefined());
        Assertions.assertEquals(currentTimeMillis2, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo2.get()), 1.0d);
        Option calculateMirrorTopicRpo3 = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 25);
        double currentTimeMillis3 = 25 / (100.0d / (((System.currentTimeMillis() - currentTimeMillis) + 30000) / 1000.0d));
        Assertions.assertTrue(currentTimeMillis3 < currentTimeMillis2);
        Assertions.assertTrue(calculateMirrorTopicRpo3.isDefined());
        Assertions.assertTrue(BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo3.get()) < BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo2.get()));
        Assertions.assertEquals(currentTimeMillis3, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo3.get()), 1.0d);
    }

    @Test
    public void testWaitingPartitionsWithDestEpochBehindWhenSourceRecordsArrive() {
        verifyWaitingPartitionsDueToNoSourceRecords(true, false);
    }

    @Test
    public void testWaitingPartitionsWithDestEpochBumpedBeforeSourceRecordsArrive() {
        verifyWaitingPartitionsDueToNoSourceRecords(false, false);
    }

    @Test
    public void testWaitingPartitionsWithSecondaryFailure() {
        verifyWaitingPartitionsDueToNoSourceRecords(false, true);
    }

    private void verifyWaitingPartitionsDueToNoSourceRecords(boolean z, boolean z2) {
        MockTime mockTime = new MockTime();
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp(), "0");
        clusterLinkConfig_$eq(createClusterLinkConfig(properties));
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latestTesting(), false, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Partition partition = (Partition) ((scala.collection.mutable.Set) tuple2._2()).head();
        partition.leaderReplicaIdOpt_$eq(new Some(BoxesRunTime.boxToInteger(brokerEndPoint().id())));
        Mockito.when(((AbstractLog) partition.log().get()).topicId()).thenReturn(new Some(Uuid.randomUuid()));
        IntRef create = IntRef.create(1);
        partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(create.elem, false, MirrorTopicError.NO_ERROR)))));
        TopicPartition topicPartition = partition.topicPartition();
        epochEndOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetForLeaderEpochResponseData.EpochEndOffset().setEndOffset(0L))})));
        readyForFetch_$eq(false);
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.ACTIVE, clusterLinkFetcherManager, topicPartition);
        Assertions.assertTrue(((PartitionAndState) ((ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "linkedPartitions")).get(topicPartition)).partition().isActiveLinkDestinationLeader());
        verifyFetcherPartitions$1((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{topicPartition.partition()})), topicPartition);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), MirrorFailureType$NoSourceRecords$.MODULE$.ConsecutiveEpochChangeUpperThreshold()).foreach$mVc$sp(i -> {
            create.elem++;
            clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
                return Predef$.MODULE$.int2Integer(create.elem);
            }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true, Collections.emptyMap()), false, mockTime.milliseconds());
            clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
            partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(create.elem, false, MirrorTopicError.NO_ERROR)))));
            if (i < MirrorFailureType$NoSourceRecords$.MODULE$.ConsecutiveEpochChangeLowerThreshold()) {
                verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.ACTIVE, clusterLinkFetcherManager, topicPartition);
            } else {
                verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.NO_SOURCE_RECORDS, clusterLinkFetcherManager, topicPartition);
            }
            Assertions.assertFalse(clusterLinkFetcherManager.isWaitingForSourceRecords(topicPartition));
            TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(create.elem));
            clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
            this.verifyFetcherPartitions$1((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{topicPartition.partition()})), topicPartition);
        });
        latestOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(0).setLeaderEpoch(create.elem).setOffset(0L))})));
        create.elem++;
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(create.elem);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true, Collections.emptyMap()), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(create.elem, false, MirrorTopicError.NO_ERROR)))));
        verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.NO_SOURCE_RECORDS, clusterLinkFetcherManager, topicPartition);
        Assertions.assertTrue(clusterLinkFetcherManager.isWaitingForSourceRecords(topicPartition));
        verifyFetcherPartitions$1((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{topicPartition.partition()})), topicPartition);
        mockTime.sleep(clusterLinkConfig().availabilityCheckMs().longValue());
        Assertions.assertTrue(clusterLinkFetcherManager.isWaitingForSourceRecords(topicPartition));
        verifyFetcherPartitions$1((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{topicPartition.partition()})), topicPartition);
        if (!z) {
            TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(create.elem));
            clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        }
        if (z2) {
            ((ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "degradedPartitions")).put(topicPartition, MirrorFailureType$TopicAuthorizationFailed$.MODULE$);
            ((PartitionAndState) ((ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "linkedPartitions")).get(topicPartition)).apiFailureType_$eq(new Some(MirrorFailureType$TopicAuthorizationFailed$.MODULE$));
            verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.SOURCE_TOPIC_AUTHORIZATION_FAILED, clusterLinkFetcherManager, topicPartition);
        }
        latestOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(0).setLeaderEpoch(create.elem).setOffset(10L))})));
        mockTime.sleep(clusterLinkConfig().availabilityCheckMs().longValue());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyWaitingPartitionsDueToNoSourceRecords$4(clusterLinkFetcherManager, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Waiting state not cleared with new source records");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        verifyFetcherPartitions$1(Predef$.MODULE$.Set().empty(), topicPartition);
        if (z2) {
            Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), MirrorFailureType$TopicAuthorizationFailed$.MODULE$)})), CollectionConverters$.MODULE$.ConcurrentMapHasAsScala((ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "degradedPartitions")).asScala());
            verifyReplicaState$1(ReplicaStatus.MirrorInfo.State.SOURCE_TOPIC_AUTHORIZATION_FAILED, clusterLinkFetcherManager, topicPartition);
            Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().updateRequested(), "Metadata requested unnecessarily");
        } else {
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            TestUtils$ testUtils$5 = TestUtils$.MODULE$;
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$verifyWaitingPartitionsDueToNoSourceRecords$6(clusterLinkFetcherManager)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Assertions.fail("Metadata not requested");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        if (z) {
            TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(create.elem));
            clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        } else {
            clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        }
        Assertions.assertFalse(clusterLinkFetcherManager.isWaitingForSourceRecords(topicPartition));
        verifyFetcherPartitions$1((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{topicPartition.partition()})), topicPartition);
    }

    @Test
    public void testNotifyReadyForFetchWithMultiplePartitions() {
        verifyNotifyReadyForFetch(4);
    }

    @Test
    public void testNotifyReadyForFetchWithSinglePartition() {
        verifyNotifyReadyForFetch(1);
    }

    private void verifyNotifyReadyForFetch(int i) {
        Tuple2 $minus$greater$extension;
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, i, MetadataVersion.latestTesting(), true, setupFetcherManagerAndPartitions$default$5(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        scala.collection.mutable.Set set = (scala.collection.mutable.Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, i);
        ReentrantLock reentrantLock = (ReentrantLock) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), AbstractFetcherThread.class, "partitionMapLock");
        Condition condition = (Condition) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), AbstractFetcherThread.class, "partitionMapCond");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            BooleanRef create = BooleanRef.create(false);
            Future submit = newFixedThreadPool.submit(() -> {
                CoreUtils$.MODULE$.inLock(reentrantLock, () -> {
                    create.elem = true;
                    condition.awaitUninterruptibly();
                    create.elem = false;
                });
            }, BoxesRunTime.boxToInteger(0));
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!create.elem) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Not waiting on condition");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            if (set.size() > 1) {
                reentrantLock.lock();
                maybeNotifyOnThreadNotHoldingLock$1(true, newFixedThreadPool, set);
                reentrantLock.unlock();
                Thread.yield();
                Assertions.assertTrue(create.elem);
                maybeNotifyOnThreadNotHoldingLock$1(true, newFixedThreadPool, set);
            } else {
                reentrantLock.lock();
                Future maybeNotifyOnThreadNotHoldingLock$1 = maybeNotifyOnThreadNotHoldingLock$1(false, newFixedThreadPool, set);
                TestUtils$ testUtils$4 = TestUtils$.MODULE$;
                TestUtils$ testUtils$5 = TestUtils$.MODULE$;
                TestUtils$ testUtils$6 = TestUtils$.MODULE$;
                long currentTimeMillis2 = System.currentTimeMillis();
                while (true) {
                    int queueLength = reentrantLock.getQueueLength();
                    Integer boxToInteger = BoxesRunTime.boxToInteger(queueLength);
                    if ($anonfun$verifyNotifyReadyForFetch$7(queueLength)) {
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                        break;
                    } else {
                        if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                            break;
                        }
                        Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
                    }
                }
                Tuple2 tuple22 = $minus$greater$extension;
                if (tuple22 == null) {
                    throw new MatchError((Object) null);
                }
                Assertions.assertEquals(1, tuple22._1$mcI$sp());
                Assertions.assertFalse(maybeNotifyOnThreadNotHoldingLock$1.isDone());
                reentrantLock.unlock();
                maybeNotifyOnThreadNotHoldingLock$1.get(15L, TimeUnit.SECONDS);
            }
            submit.get(15L, TimeUnit.SECONDS);
            Assertions.assertFalse(create.elem);
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    @Test
    public void testFetchResponseWithPartitionData() {
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(10L));
        verifyFetchResponseHandling(new Some(logAppendInfo), true, false);
    }

    @Test
    public void testFetchResponseWithNoPartitionData() {
        verifyFetchResponseHandling(None$.MODULE$, false, false);
    }

    @Test
    public void testFetchResponseWithNoNewMessages() {
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(0L));
        verifyFetchResponseHandling(new Some(logAppendInfo), false, false);
    }

    @Test
    public void testUseIndependentRetention() {
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(10L));
        verifyFetchResponseHandling(new Some(logAppendInfo), true, true);
    }

    private void verifyFetchResponseHandling(Option<LogAppendInfo> option, boolean z, boolean z2) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Properties properties = new Properties();
        if (z2) {
            properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(","));
        }
        clusterLinkConfig_$eq(createClusterLinkConfig(properties));
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.appendRecordsToFollower(BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any())).thenReturn(option);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats());
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenCallRealMethod();
        ClusterLinkFetcher createReplicaFetcherThread = createReplicaFetcherThread("link", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), Time.SYSTEM, (ReplicaQuota) Mockito.mock(ReplicaQuota.class), newMockBlockingSend, None$.MODULE$, createReplicaFetcherThread$default$14());
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        FetchResponseData.PartitionData logStartOffset = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L).setLogStartOffset(50L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, logStartOffset.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, logStartOffset.setPartitionIndex(1));
        createReplicaFetcherThread.doWork();
        if (z) {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.eq(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$))));
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        ((Partition) Mockito.verify(partition, Mockito.times(2))).appendRecordsToFollower(BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.eq(z2 ? Optional.empty() : Optional.of(BoxesRunTime.boxToLong(50L))));
        Assertions.assertTrue(createReplicaFetcherThread.partitionsWithNewHighWatermark().isEmpty());
        Assertions.assertTrue(createReplicaFetcherThread.partitionsWithNewRecords().isEmpty());
    }

    @Test
    public void testValidateMirrorTruncationWithTopicIds() {
        verifyValidateMirrorTruncation(true, true, false);
    }

    @Test
    public void testValidateMirrorTruncationWithoutTopicIds() {
        verifyValidateMirrorTruncation(false, true, false);
    }

    @Test
    public void testDisallowMirrorTruncationBelowHWMForHybridLink() {
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig$.MODULE$.AllowTruncationBelowHWMProp(), "false");
        clusterLinkConfig_$eq(createClusterLinkConfig(properties));
        verifyValidateMirrorTruncation(true, false, false);
    }

    @Test
    public void testDisallowMirrorTruncationBelowHWMForCloudLink() {
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig$.MODULE$.AllowTruncationBelowHWMProp(), "false");
        clusterLinkConfig_$eq(createClusterLinkConfig(properties));
        verifyValidateMirrorTruncation(true, false, true);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void testTruncationLessThanHWMMetric() {
        verifyTruncationLessThanHWMMetric(true);
    }

    private void verifyValidateMirrorTruncation(boolean z, boolean z2, boolean z3) {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, scala.collection.mutable.Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latestTesting(), false, (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.allow.truncation.below.hwm"), Boolean.toString(z2))})), z3);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Partition partition = (Partition) ((scala.collection.mutable.Set) tuple2._2()).head();
        partition.leaderReplicaIdOpt_$eq(new Some(BoxesRunTime.boxToInteger(brokerEndPoint().id())));
        partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(1, false, MirrorTopicError.NO_ERROR)))));
        TopicPartition topicPartition = partition.topicPartition();
        AbstractLog abstractLog = (AbstractLog) partition.log().get();
        sourceTopicId_$eq(z ? Uuid.randomUuid() : Uuid.ZERO_UUID);
        TestUtils.setFieldValue(partition, "linkedTopicId", sourceTopicId());
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "linkedPartitions");
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        setupLog$1(5L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyValidTruncation$1(9L, new Some(BoxesRunTime.boxToInteger(0)), topicPartition, abstractLog, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(0)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(1)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(2)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        if (z2) {
            verifyValidTruncation$1(5L, new Some(BoxesRunTime.boxToInteger(1)), topicPartition, abstractLog, concurrentHashMap);
        } else {
            verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(1)), MirrorFailureType$TruncationBelowHighWatermark$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        }
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new TimeoutException()));
        verifyRetriableTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new InvalidRequestException("Test exception")));
        verifyNonRetriableTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), InvalidRequestException.class, topicPartition, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Mockito.when(abstractLog.endOffsetForEpoch(1)).thenReturn(new Some(new OffsetAndEpoch(4L, 1)));
        epochEndOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetForLeaderEpochResponseData.EpochEndOffset().setEndOffset(5L))})));
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Mockito.when(abstractLog.endOffsetForEpoch(1)).thenReturn(new Some(new OffsetAndEpoch(6L, 1)));
        epochEndOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetForLeaderEpochResponseData.EpochEndOffset().setEndOffset(5L))})));
        if (z2) {
            verifyValidTruncation$1(5L, new Some(BoxesRunTime.boxToInteger(3)), topicPartition, abstractLog, concurrentHashMap);
        } else {
            verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), MirrorFailureType$TruncationBelowHighWatermark$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        }
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Uuid sourceTopicId = sourceTopicId();
        sourceTopicId_$eq(Uuid.randomUuid());
        if (z) {
            verifyTruncationFailure$1(5L, None$.MODULE$, MirrorFailureType$SourceTopicIdChanged$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        sourceTopicId_$eq(sourceTopicId);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new TimeoutException()));
        if (z) {
            verifyRetriableTruncationFailure$1(5L, None$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new InvalidRequestException("Test exception")));
        if (z) {
            verifyNonRetriableTruncationFailure$1(5L, None$.MODULE$, InvalidRequestException.class, topicPartition, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        sourceTopicId_$eq(Uuid.ZERO_UUID);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        if (z2) {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        } else {
            verifyTruncationFailure$1(5L, None$.MODULE$, MirrorFailureType$TruncationBelowHighWatermark$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        }
        sourceTopicId_$eq(sourceTopicId);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        if (z2) {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        } else {
            verifyTruncationFailure$1(5L, None$.MODULE$, MirrorFailureType$TruncationBelowHighWatermark$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        }
    }

    private boolean verifyValidateMirrorTruncation$default$2() {
        return true;
    }

    private boolean verifyValidateMirrorTruncation$default$3() {
        return false;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void verifyOffsetRequestVersion(MetadataVersion metadataVersion, OffsetsForLeaderEpochRequest.Builder builder, ListOffsetsRequest.Builder builder2) {
        Assertions.assertEquals(3, builder.oldestAllowedVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.latestAllowedVersion());
        Assertions.assertEquals(0, builder2.oldestAllowedVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builder2.latestAllowedVersion());
    }

    public ClusterLinkFetchResponseAllocator kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseAllocator(KafkaConfig kafkaConfig, FetcherPool fetcherPool) {
        return new ClusterLinkFetchResponseAllocator(kafkaConfig, fetcherPool);
    }

    public FetcherPool kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseAllocator$default$2() {
        return FetcherPool$Default$.MODULE$;
    }

    private ConcurrentHashMap<TopicPartition, PartitionAndState> allLinkedPartitions(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "linkedPartitions");
    }

    private ConcurrentHashMap<TopicPartition, MirrorFailureType> degradedPartitions(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "degradedPartitions");
    }

    public static final /* synthetic */ boolean $anonfun$testFetchRequestPartitionMaxSize$3(Map map, java.util.Map map2) {
        java.util.Map asJava = CollectionConverters$.MODULE$.MapHasAsJava(map).asJava();
        return map2 == null ? asJava == null : map2.equals(asJava);
    }

    public static final /* synthetic */ scala.collection.mutable.Set $anonfun$setupFetcherManagerAndPartitions$1(ClusterLinkFetcherThreadTest clusterLinkFetcherThreadTest, MockTime mockTime, AlterPartitionListener alterPartitionListener, LogManager logManager, AbstractLog abstractLog, scala.collection.mutable.Set set, int i) {
        TopicPartition topicPartition = new TopicPartition("topic", i);
        MetadataVersion latestTesting = MetadataVersion.latestTesting();
        ListenerName listenerName = (ListenerName) Mockito.mock(ListenerName.class);
        JFunction0.mcJ.sp spVar = () -> {
            return 1L;
        };
        DelayedOperations delayedOperations = (DelayedOperations) Mockito.mock(DelayedOperations.class);
        MetadataCache$ metadataCache$ = MetadataCache$.MODULE$;
        MetadataVersion latestTesting2 = MetadataVersion.latestTesting();
        MetadataCache$ metadataCache$2 = MetadataCache$.MODULE$;
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        MetadataCache$ metadataCache$3 = MetadataCache$.MODULE$;
        MetadataCache$ metadataCache$4 = MetadataCache$.MODULE$;
        ZkMetadataCache zkMetadataCache = new ZkMetadataCache(0, latestTesting2, createEmpty, false, false);
        None$ none$ = None$.MODULE$;
        None$ none$2 = None$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils.MockAlterPartitionManager mockAlterPartitionManager = new TestUtils.MockAlterPartitionManager();
        None$ none$4 = None$.MODULE$;
        Partition$ partition$ = Partition$.MODULE$;
        Partition$ partition$2 = Partition$.MODULE$;
        None$ none$5 = None$.MODULE$;
        Partition$ partition$3 = Partition$.MODULE$;
        None$ none$6 = None$.MODULE$;
        Partition$ partition$4 = Partition$.MODULE$;
        Partition partition = new Partition(topicPartition, 10000L, latestTesting, listenerName, 0, spVar, mockTime, alterPartitionListener, delayedOperations, zkMetadataCache, logManager, none$, none$2, none$3, mockAlterPartitionManager, none$4, false, none$5, none$6, None$.MODULE$);
        partition.log_$eq(new Some(abstractLog));
        Mockito.when(clusterLinkFetcherThreadTest.kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().localLogOrException(topicPartition)).thenReturn(abstractLog);
        Mockito.when(clusterLinkFetcherThreadTest.kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().onlinePartition(topicPartition)).thenReturn(new Some(partition));
        Mockito.when(clusterLinkFetcherThreadTest.kafka$server$link$ClusterLinkFetcherThreadTest$$replicaManager().getPartitionOrException(topicPartition)).thenReturn(partition);
        org.apache.kafka.test.TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(2));
        return set.$plus$eq(partition);
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$2(TopicPartition topicPartition, int i, ClusterLinkFetcherThread clusterLinkFetcherThread) {
        clusterLinkFetcherThread.updateFetcherLagStats(topicPartition, i);
    }

    private final void updatePartitionThroughputAndLag$1(long j, int i, LongRef longRef, int i2, LongRef longRef2, DoubleRef doubleRef, long j2) {
        longRef.elem = 0L;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), i2 - 1).foreach$mVc$sp(i2222222 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i2222222);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherByteRate(topicPartition, j);
            longRef.elem += j;
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$allfetcherThreads().foreach(clusterLinkFetcherThread -> {
                $anonfun$testAutoTuneFetchers$2(topicPartition, i, clusterLinkFetcherThread);
                return BoxedUnit.UNIT;
            });
        });
        longRef2.elem += longRef.elem;
        doubleRef.elem = longRef.elem / j2;
    }

    private final CommittedPartitionState newPartitionState$1(IntRef intRef) {
        return new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(intRef.elem, false, MirrorTopicError.NO_ERROR))));
    }

    private final void bumpSourceEpoch$1(IntRef intRef, ClusterLinkFetcherManager clusterLinkFetcherManager, MockTime mockTime, Partition partition) {
        intRef.elem++;
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(intRef.elem);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true, Collections.emptyMap()), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(intRef.elem, false, MirrorTopicError.NO_ERROR)))));
    }

    private static final void bumpDestEpoch$1(Partition partition, IntRef intRef, ClusterLinkFetcherManager clusterLinkFetcherManager) {
        org.apache.kafka.test.TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(intRef.elem));
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
    }

    private static final void verifyReplicaState$1(ReplicaStatus.MirrorInfo.State state, ClusterLinkFetcherManager clusterLinkFetcherManager, TopicPartition topicPartition) {
        Assertions.assertEquals(state, clusterLinkFetcherManager.partitionMirrorState(topicPartition, ReplicaStatus.MirrorInfo.State.ACTIVE, None$.MODULE$).state());
    }

    public static final /* synthetic */ void $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$2(ClusterLinkFetcherThreadTest clusterLinkFetcherThreadTest, TopicPartition topicPartition, scala.collection.immutable.Set set) {
        clusterLinkFetcherThreadTest.verifyFetcherThreadPartitions(clusterLinkFetcherThreadTest.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), topicPartition.topic(), set);
    }

    private final void verifyFetcherPartitions$1(scala.collection.immutable.Set set, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$2(this, topicPartition, set);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$4(ClusterLinkFetcherManager clusterLinkFetcherManager, TopicPartition topicPartition) {
        return !clusterLinkFetcherManager.isWaitingForSourceRecords(topicPartition);
    }

    public static final /* synthetic */ String $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$5() {
        return "Waiting state not cleared with new source records";
    }

    public static final /* synthetic */ boolean $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$6(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return clusterLinkFetcherManager.currentMetadata().updateRequested();
    }

    public static final /* synthetic */ String $anonfun$verifyWaitingPartitionsDueToNoSourceRecords$7() {
        return "Metadata not requested";
    }

    private final Future maybeNotifyOnThreadNotHoldingLock$1(boolean z, ExecutorService executorService, scala.collection.mutable.Set set) {
        Future submit = executorService.submit(() -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().maybeNotifyReadyForFetch((Partition) set.head());
        }, BoxesRunTime.boxToInteger(0));
        if (z) {
            submit.get(15L, TimeUnit.SECONDS);
        }
        return submit;
    }

    public static final /* synthetic */ String $anonfun$verifyNotifyReadyForFetch$5() {
        return "Not waiting on condition";
    }

    public static final /* synthetic */ boolean $anonfun$verifyNotifyReadyForFetch$7(int i) {
        return i > 0;
    }

    private final void setupLog$1(long j, long j2, Option option, AbstractLog abstractLog, boolean z, ClusterLinkFetcherManager clusterLinkFetcherManager, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap) {
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(j));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(j2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(option);
        Mockito.when(abstractLog.topicId()).thenReturn(z ? new Some(sourceTopicId()) : None$.MODULE$);
        clusterLinkFetcherManager.failedPartitions().removeAll((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType_$eq(None$.MODULE$);
        epochEndOffsets_$eq(Predef$.MODULE$.Map().empty());
        leaderEndPointException_$eq(None$.MODULE$);
    }

    private final void verifyValidTruncation$1(long j, Option option, TopicPartition topicPartition, AbstractLog abstractLog, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(61).append("Truncation validation failed for ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertEquals(None$.MODULE$, ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyTruncationFailure$1(long j, Option option, MirrorFailureType mirrorFailureType, TopicPartition topicPartition, AbstractLog abstractLog, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertFalse(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(67).append("Truncation validation did not fail for ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertTrue(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(new Some(mirrorFailureType), ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyRetriableTruncationFailure$1(long j, Option option, TopicPartition topicPartition, AbstractLog abstractLog, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertFalse(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(88).append("Truncation validation did not fail for retriable error with ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertFalse(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(new Some(MirrorFailureType$LinkNotAvailable$.MODULE$), ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyNonRetriableTruncationFailure$1(long j, Option option, Class cls, TopicPartition topicPartition, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertThrows(cls, () -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState);
        });
        Assertions.assertFalse(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(None$.MODULE$, ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    public ClusterLinkFetcherThreadTest() {
        clusterLinkMetrics().startup();
    }
}
