package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.KafkaConfig;
import kafka.server.UnboundedClusterLinkRequestQuota$;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkTask;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.SeqOps;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Map$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkBatchTasksTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u001de\u0001B\u001c9\u0001}BQA\u0012\u0001\u0005\u0002\u001dCqA\u0013\u0001C\u0002\u0013%1\n\u0003\u0004P\u0001\u0001\u0006I\u0001\u0014\u0005\b!\u0002\u0011\r\u0011\"\u0003R\u0011\u0019)\u0006\u0001)A\u0005%\"9a\u000b\u0001b\u0001\n\u00139\u0006BB3\u0001A\u0003%\u0001\fC\u0004g\u0001\t\u0007I\u0011B4\t\r-\u0004\u0001\u0015!\u0003i\u0011\u001da\u0007A1A\u0005\n\u001dDa!\u001c\u0001!\u0002\u0013A\u0007b\u00028\u0001\u0005\u0004%Ia\u001c\u0005\u0007g\u0002\u0001\u000b\u0011\u00029\t\u000fQ\u0004!\u0019!C\u0005k\"1\u0011\u0010\u0001Q\u0001\nYDqA\u001f\u0001C\u0002\u0013%1\u0010\u0003\u0004��\u0001\u0001\u0006I\u0001 \u0005\n\u0003\u0003\u0001\u0001\u0019!C\u0005\u0003\u0007A\u0011\"!\u0006\u0001\u0001\u0004%I!a\u0006\t\u0011\u0005\r\u0002\u0001)Q\u0005\u0003\u000bA\u0011\"!\n\u0001\u0005\u0004%I!a\n\t\u0011\u0005E\u0002\u0001)A\u0005\u0003SA\u0011\"a\r\u0001\u0005\u0004%I!!\u000e\t\u0011\u0005u\u0002\u0001)A\u0005\u0003oA\u0011\"a\u0010\u0001\u0005\u0004%I!!\u0011\t\u0011\u0005%\u0003\u0001)A\u0005\u0003\u0007B1\"a\u0013\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002N!Y\u0011Q\u000b\u0001A\u0002\u0003\u0007I\u0011BA,\u0011-\tY\u0006\u0001a\u0001\u0002\u0003\u0006K!a\u0014\t\u0013\u0005u\u0003A1A\u0005\n\u0005}\u0003\u0002CAE\u0001\u0001\u0006I!!\u0019\t\u0013\u0005-\u0005A1A\u0005\n\u00055\u0005\u0002CAQ\u0001\u0001\u0006I!a$\t\u0013\u0005\r\u0006A1A\u0005\n\u0005\u0015\u0006\u0002CA_\u0001\u0001\u0006I!a*\t\u000f\u0005}\u0006\u0001\"\u0001\u0002B\"9\u0011\u0011\u001c\u0001\u0005\u0002\u0005\u0005\u0007bBAr\u0001\u0011\u0005\u0011\u0011\u0019\u0005\b\u0003[\u0004A\u0011AAa\u0011\u001d\t\t\u0010\u0001C\u0001\u0003\u0003Dq!!>\u0001\t\u0003\t\t\rC\u0004\u0002z\u0002!\t!!1\t\u000f\u0005u\b\u0001\"\u0003\u0002��\"I!1\u0002\u0001\u0012\u0002\u0013%!Q\u0002\u0005\b\u0005G\u0001A\u0011\u0002B\u0013\u0011\u001d\u0011\t\u0004\u0001C\u0005\u0005gAqA!\u0010\u0001\t\u0013\u0011y\u0004C\u0004\u0003L\u0001!I!!1\t\u000f\t5\u0003\u0001\"\u0003\u0003P!9!1\f\u0001\u0005\n\tu\u0003b\u0002B1\u0001\u0011%!1\r\u0005\b\u0005_\u0002A\u0011\u0002B9\u0011\u001d\u00119\b\u0001C\u0005\u0005sBqAa \u0001\t\u0013\u0011\tIA\rDYV\u001cH/\u001a:MS:\\')\u0019;dQR\u000b7o[:UKN$(BA\u001d;\u0003\u0011a\u0017N\\6\u000b\u0005mb\u0014AB:feZ,'OC\u0001>\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001!\u0011\u0005\u0005#U\"\u0001\"\u000b\u0003\r\u000bQa]2bY\u0006L!!\u0012\"\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t\u0001\n\u0005\u0002J\u00015\t\u0001(A\u0005tG\",G-\u001e7feV\tA\n\u0005\u0002J\u001b&\u0011a\n\u000f\u0002\u0015\u00072,8\u000f^3s\u0019&t7nU2iK\u0012,H.\u001a:\u0002\u0015M\u001c\u0007.\u001a3vY\u0016\u0014\b%\u0001\u0006m_\u000e\fG.\u00113nS:,\u0012A\u0015\t\u0003\u0013NK!\u0001\u0016\u001d\u0003+\rcWo\u001d;fe2Kgn\u001b'pG\u0006d\u0017\tZ7j]\u0006YAn\\2bY\u0006#W.\u001b8!\u0003-\u0011X-\\8uK\u0006#W.\u001b8\u0016\u0003a\u0003\"!W2\u000e\u0003iS!a\u0017/\u0002\u000b\u0005$W.\u001b8\u000b\u0005us\u0016aB2mS\u0016tGo\u001d\u0006\u0003{}S!\u0001Y1\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0017aA8sO&\u0011AM\u0017\u0002\u000f\u0007>tg\r\\;f]R\fE-\\5o\u00031\u0011X-\\8uK\u0006#W.\u001b8!\u0003A\u0011X-\\8uK\n\u000bGo\u00195BI6Lg.F\u0001i!\tI\u0015.\u0003\u0002kq\tY2\t\\;ti\u0016\u0014H*\u001b8l\u001d>t')\u0019;dQ&tw-\u00113nS:\f\u0011C]3n_R,')\u0019;dQ\u0006#W.\u001b8!\u0003=awnY1m\u0005\u0006$8\r[!e[&t\u0017\u0001\u00057pG\u0006d')\u0019;dQ\u0006#W.\u001b8!\u0003-a\u0017N\\6NC:\fw-\u001a:\u0016\u0003A\u0004\"!S9\n\u0005ID$AE\"mkN$XM\u001d'j].l\u0015M\\1hKJ\fA\u0002\\5oW6\u000bg.Y4fe\u0002\n1\u0002\\5oW6+GO]5dgV\ta\u000f\u0005\u0002Jo&\u0011\u0001\u0010\u000f\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T3ue&\u001c7/\u0001\u0007mS:\\W*\u001a;sS\u000e\u001c\b%A\bnKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s+\u0005a\bCA%~\u0013\tq\bH\u0001\u000eDYV\u001cH/\u001a:MS:\\W*\u001a;bI\u0006$\u0018-T1oC\u001e,'/\u0001\tnKR\fG-\u0019;b\u001b\u0006t\u0017mZ3sA\u0005!A/[7f+\t\t)\u0001\u0005\u0003\u0002\b\u0005EQBAA\u0005\u0015\u0011\tY!!\u0004\u0002\u000bU$\u0018\u000e\\:\u000b\u0007\u0005=a,\u0001\u0004d_6lwN\\\u0005\u0005\u0003'\tIA\u0001\u0003US6,\u0017\u0001\u0003;j[\u0016|F%Z9\u0015\t\u0005e\u0011q\u0004\t\u0004\u0003\u0006m\u0011bAA\u000f\u0005\n!QK\\5u\u0011%\t\tcEA\u0001\u0002\u0004\t)!A\u0002yIE\nQ\u0001^5nK\u0002\nQ!];pi\u0006,\"!!\u000b\u0011\t\u0005-\u0012QF\u0007\u0002u%\u0019\u0011q\u0006\u001e\u0003/\rcWo\u001d;fe2Kgn\u001b*fcV,7\u000f^)v_R\f\u0017AB9v_R\f\u0007%\u0001\u0007ce>\\WM]\"p]\u001aLw-\u0006\u0002\u00028A!\u00111FA\u001d\u0013\r\tYD\u000f\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\u0007ce>\\WM]\"p]\u001aLw\rI\u0001\u000bY&t7nQ8oM&<WCAA\"!\rI\u0015QI\u0005\u0004\u0003\u000fB$!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006YA.\u001b8l\u0007>tg-[4!\u00035\u0001XM]5pI&\u001cG+Y:lgV\u0011\u0011q\n\t\u0004\u0013\u0006E\u0013bAA*q\tA2\t\\;ti\u0016\u0014H*\u001b8l!\u0016\u0014\u0018n\u001c3jGR\u000b7o[:\u0002#A,'/[8eS\u000e$\u0016m]6t?\u0012*\u0017\u000f\u0006\u0003\u0002\u001a\u0005e\u0003\"CA\u00119\u0005\u0005\t\u0019AA(\u00039\u0001XM]5pI&\u001cG+Y:lg\u0002\n1\u0002^3oC:$H*\u001b8lgV\u0011\u0011\u0011\r\t\t\u0003G\ni'!\u001d\u0002x5\u0011\u0011Q\r\u0006\u0005\u0003O\nI'A\u0005j[6,H/\u00192mK*\u0019\u00111\u000e\"\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002p\u0005\u0015$aA'baB\u0019\u0011)a\u001d\n\u0007\u0005U$IA\u0002J]R\u0004b!a\u0019\u0002z\u0005u\u0014\u0002BA>\u0003K\u0012!\"\u00138eKb,GmU3r!\u0011\ty(!\"\u000e\u0005\u0005\u0005%bAABy\u0005\u0011!p[\u0005\u0005\u0003\u000f\u000b\tIA\bDYV\u001cH/\u001a:MS:\\G)\u0019;b\u00031!XM\\1oi2Kgn[:!\u00031\u0011XO\u001c8j]\u001e$\u0016m]6t+\t\ty\t\u0005\u0004\u0002\u0012\u0006]\u00151T\u0007\u0003\u0003'SA!!&\u0002j\u00059Q.\u001e;bE2,\u0017\u0002BAM\u0003'\u00131aU3u!\u001d\t\u0015QTA9\u0003cJ1!a(C\u0005\u0019!V\u000f\u001d7fe\u0005i!/\u001e8oS:<G+Y:lg\u0002\n1\u0002^1tW\u001a+H/\u001e:fgV\u0011\u0011q\u0015\t\t\u0003#\u000bI+! \u0002,&!\u0011qNAJ!\u0019\ti+a-\u000286\u0011\u0011q\u0016\u0006\u0005\u0003c\u000bi!A\u0005j]R,'O\\1mg&!\u0011QWAX\u0005=Y\u0015MZ6b\rV$XO]3J[Bd\u0007cA-\u0002:&\u0019\u00111\u0018.\u0003\r\r{gNZ5h\u00031!\u0018m]6GkR,(/Z:!\u0003\u0015\u0019X\r^+q)\t\tI\u0002K\u0002%\u0003\u000b\u0004B!a2\u0002V6\u0011\u0011\u0011\u001a\u0006\u0005\u0003\u0017\fi-A\u0002ba&TA!a4\u0002R\u00069!.\u001e9ji\u0016\u0014(bAAjC\u0006)!.\u001e8ji&!\u0011q[Ae\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fR8x]\"\u001aQ%!8\u0011\t\u0005\u001d\u0017q\\\u0005\u0005\u0003C\fIMA\u0005BMR,'/R1dQ\u0006\tB/Z:u!\u0016\u0014\u0018n\u001c3jGR\u000b7o[:)\u0007\u0019\n9\u000f\u0005\u0003\u0002H\u0006%\u0018\u0002BAv\u0003\u0013\u0014A\u0001V3ti\u0006\u0011B/Z:u)\u0006\u001c8nU2iK\u0012,H.\u001b8hQ\r9\u0013q]\u0001\u0016i\u0016\u001cHoU5oO2,G+\u001a8b]R$\u0016m]6tQ\rA\u0013q]\u0001\u0015i\u0016\u001cH/T;mi&$VM\\1oiR\u000b7o[:)\u0007%\n9/A\u000euKN$H+\u001a8b]R\u001cx+\u001b;i\u001d>\u0014V-\u00193z)\u0006\u001c8n\u001d\u0015\u0004U\u0005\u001d\u0018!E2mkN$XM\u001d'j].\u001cuN\u001c4jOR!\u00111\tB\u0001\u0011%\u0011\u0019a\u000bI\u0001\u0002\u0004\u0011)!\u0001\u0007f]\u0006\u0014G.Z$s_V\u00048\u000fE\u0002B\u0005\u000fI1A!\u0003C\u0005\u001d\u0011un\u001c7fC:\f1d\u00197vgR,'\u000fT5oW\u000e{gNZ5hI\u0011,g-Y;mi\u0012\nTC\u0001B\bU\u0011\u0011)A!\u0005,\u0005\tM\u0001\u0003\u0002B\u000b\u0005?i!Aa\u0006\u000b\t\te!1D\u0001\nk:\u001c\u0007.Z2lK\u0012T1A!\bC\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0005C\u00119BA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\f\u0011#\\8dW\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s)\u0011\u00119C!\f\u0011\u0007%\u0013I#C\u0002\u0003,a\u0012Ad\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"mS\u0016tG/T1oC\u001e,'\u000fC\u0004\u000305\u0002\r!! \u0002\u00111Lgn\u001b#bi\u0006\f!\u0002^3oC:$H*\u001b8l)\u0019\tiH!\u000e\u0003:!9!q\u0007\u0018A\u0002\u0005E\u0014a\u0003;f]\u0006tG/\u00138eKbDqAa\u000f/\u0001\u0004\t\t(A\u0005mS:\\\u0017J\u001c3fq\u0006A\u0011\r\u001a3UCN\\7\u000f\u0006\u0005\u0002\u001a\t\u0005#1\tB$\u0011\u001d\u00119d\fa\u0001\u0003cBqA!\u00120\u0001\u0004\t\t(\u0001\bgSJ\u001cH\u000fT5oW&sG-\u001a=\t\u000f\t%s\u00061\u0001\u0002r\u0005Aa.^7MS:\\7/A\u0004sk:|enY3\u0002\u001dI,hn\u00148dK\u0006sGmV1jiR!\u0011\u0011\u0004B)\u0011\u001d\u0011\u0019&\ra\u0001\u0005+\nq\"\u001a=qK\u000e$X\r\u001a*v]:Lgn\u001a\t\u0006\u0003\n]\u00131T\u0005\u0004\u00053\u0012%A\u0003\u001fsKB,\u0017\r^3e}\u0005qq/Y5u\r>\u0014(+\u001e8oS:<G\u0003BA\r\u0005?BqAa\u00153\u0001\u0004\u0011)&A\nu_BL7mQ8oM&<7+\u001f8d)\u0006\u001c8\u000e\u0006\u0004\u0003f\t-$Q\u000e\t\u0004\u0013\n\u001d\u0014b\u0001B5q\ta2\t\\;ti\u0016\u0014H*\u001b8l'ft7\rV8qS\u000e\u001c8i\u001c8gS\u001e\u001c\bb\u0002B\u001cg\u0001\u0007\u0011\u0011\u000f\u0005\b\u0005w\u0019\u0004\u0019AA9\u0003q\u0019w.\u001c9mKR,Gk\u001c9jG\u000e{gNZ5h'ft7\rV1tWN$B!!\u0007\u0003t!9!Q\u000f\u001bA\u0002\tU\u0013!\u00027j].\u001c\u0018aE<bSR4uN]!e[&t'+Z9vKN$HCBAV\u0005w\u0012i\bC\u0004\u00038U\u0002\r!!\u001d\t\u000f\tmR\u00071\u0001\u0002r\u0005\u0001r/Y5u\r>\u0014H+Y:l\u0007>,h\u000e\u001e\u000b\u0005\u00033\u0011\u0019\tC\u0004\u0003\u0006Z\u0002\r!!\u001d\u0002\u000b\r|WO\u001c;")
/* loaded from: input_file:kafka/server/link/ClusterLinkBatchTasksTest.class */
public class ClusterLinkBatchTasksTest {
    private final ClusterLinkScheduler scheduler;
    private final ClusterLinkLocalAdmin localAdmin;
    private final ConfluentAdmin remoteAdmin;
    private final ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin;
    private final ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin;
    private final ClusterLinkManager linkManager;
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics;
    private final ClusterLinkMetadataManager kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager;
    private Time kafka$server$link$ClusterLinkBatchTasksTest$$time;
    private final ClusterLinkRequestQuota kafka$server$link$ClusterLinkBatchTasksTest$$quota;
    private final KafkaConfig brokerConfig;
    private final ClusterLinkConfig linkConfig;
    private ClusterLinkPeriodicTasks periodicTasks;
    private final Map<Object, IndexedSeq<ClusterLinkData>> tenantLinks;
    private final Set<Tuple2<Object, Object>> kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks;
    private final scala.collection.mutable.Map<ClusterLinkData, KafkaFutureImpl<Config>> taskFutures;

    private ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterLinkLocalAdmin localAdmin() {
        return this.localAdmin;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConfluentAdmin remoteAdmin() {
        return this.remoteAdmin;
    }

    public ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin;
    }

    public ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics;
    }

    public ClusterLinkMetadataManager kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager;
    }

    public Time kafka$server$link$ClusterLinkBatchTasksTest$$time() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$time;
    }

    private void time_$eq(Time time) {
        this.kafka$server$link$ClusterLinkBatchTasksTest$$time = time;
    }

    public ClusterLinkRequestQuota kafka$server$link$ClusterLinkBatchTasksTest$$quota() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$quota;
    }

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterLinkConfig linkConfig() {
        return this.linkConfig;
    }

    private ClusterLinkPeriodicTasks periodicTasks() {
        return this.periodicTasks;
    }

    private void periodicTasks_$eq(ClusterLinkPeriodicTasks clusterLinkPeriodicTasks) {
        this.periodicTasks = clusterLinkPeriodicTasks;
    }

    private Map<Object, IndexedSeq<ClusterLinkData>> tenantLinks() {
        return this.tenantLinks;
    }

    public Set<Tuple2<Object, Object>> kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks() {
        return this.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks;
    }

    private scala.collection.mutable.Map<ClusterLinkData, KafkaFutureImpl<Config>> taskFutures() {
        return this.taskFutures;
    }

    @BeforeEach
    public void setUp() {
        scheduler().startup();
        kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics().startup();
        Mockito.when(brokerConfig().clusterLinkPeriodicTaskBatchSize()).thenReturn(Predef$.MODULE$.int2Integer(2));
        Mockito.when(brokerConfig().clusterLinkPeriodicTaskBatchIntervalMs()).thenReturn(Predef$.MODULE$.int2Integer(10));
        Mockito.when(linkManager().brokerConfig()).thenReturn(brokerConfig());
        Mockito.when(BoxesRunTime.boxToBoolean(kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager().isLinkCoordinator((String) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager().getTopicConfig((String) ArgumentMatchers.any())).thenReturn(new Properties());
    }

    @AfterEach
    public void tearDown() {
        scheduler().shutdown();
        kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics().shutdown();
    }

    @Test
    public void testPeriodicTasks() {
        time_$eq(new MockTime(1L));
        periodicTasks_$eq(new ClusterLinkPeriodicTasks(brokerConfig(), linkManager(), scheduler(), kafka$server$link$ClusterLinkBatchTasksTest$$time()));
        IntRef create = IntRef.create(0);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl4 = new KafkaFutureImpl();
        Mockito.when(remoteAdmin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            create.elem++;
            return ConfluentAdminUtils.newDescribeClusterResult(kafkaFutureImpl, kafkaFutureImpl2, kafkaFutureImpl3, kafkaFutureImpl4);
        });
        KafkaFutureImpl kafkaFutureImpl5 = new KafkaFutureImpl();
        Mockito.when(remoteAdmin().listTopics()).thenAnswer(invocationOnMock2 -> {
            create.elem++;
            return ConfluentAdminUtils.newListTopicsResult(kafkaFutureImpl5);
        });
        ClusterLinkData tenantLink = tenantLink(0, 0);
        ClusterLinkDestClientManager mockClientManager = mockClientManager(tenantLink);
        periodicTasks().addTask(ClusterLinkCheckAvailabilityTaskType$.MODULE$, tenantLink, new ClusterLinkCheckLinkAvailability(() -> {
            return this.linkConfig();
        }, kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics(), linkManager(), 10, scheduler(), tenantLink, kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$time(), kafka$server$link$ClusterLinkBatchTasksTest$$quota()));
        waitForTaskCount(1);
        ClusterLinkTask.TaskInfo taskInfo = (ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink.linkId(), ClusterLinkCheckAvailabilityTaskType$.MODULE$)).get();
        Assertions.assertEquals(tenantLink.linkId(), taskInfo.linkId());
        Assertions.assertEquals(None$.MODULE$, taskInfo.topic());
        Assertions.assertNotEquals(UnboundedClusterLinkRequestQuota$.MODULE$, taskInfo.task().quota());
        Assertions.assertNull(taskInfo.future(), "Task started too early");
        periodicTasks().addTask(ClusterLinkAutoMirroringTaskType$.MODULE$, tenantLink, new ClusterLinkAutoCreateMirror(mockClientManager, kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager(), tenantLink, kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics(), kafka$server$link$ClusterLinkBatchTasksTest$$time(), kafka$server$link$ClusterLinkBatchTasksTest$$quota()));
        waitForTaskCount(2);
        periodicTasks().startup();
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(5000L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int i = create.elem;
            Integer boxToInteger = BoxesRunTime.boxToInteger(i);
            if ($anonfun$testPeriodicTasks$4(2, i)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(2, create.elem);
        Assertions.assertFalse(taskInfo.future().isDone(), "Availability task completed too early");
        Assertions.assertFalse(((ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink.linkId(), ClusterLinkAutoMirroringTaskType$.MODULE$)).get()).future().isDone(), "Auto-mirroring task completed too early");
        ClusterLinkData tenantLink2 = tenantLink(1, 1);
        periodicTasks().addTask(ClusterLinkAutoMirroringTaskType$.MODULE$, tenantLink2, new ClusterLinkAutoCreateMirror(mockClientManager(tenantLink2), kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager(), tenantLink2, kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics(), kafka$server$link$ClusterLinkBatchTasksTest$$time(), kafka$server$link$ClusterLinkBatchTasksTest$$quota()));
        waitForTaskCount(3);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(5000L);
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            int i2 = create.elem;
            Integer boxToInteger2 = BoxesRunTime.boxToInteger(i2);
            if ($anonfun$testPeriodicTasks$4(3, i2)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(3, create.elem);
        ClusterLinkTask.TaskInfo taskInfo2 = (ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink2.linkId(), ClusterLinkAutoMirroringTaskType$.MODULE$)).get();
        Assertions.assertFalse(taskInfo2.future().isDone(), "Availability task completed too early");
        ClusterLinkPeriodicTasks periodicTasks = periodicTasks();
        periodicTasks.shutdownTasks(tenantLink2.linkId(), "link deleted", periodicTasks.shutdownTasks$default$3());
        waitForTaskCount(2);
        Assertions.assertTrue(taskInfo2.future().isCancelled(), "Task not cancelled");
        ClusterLinkPeriodicTasks periodicTasks2 = periodicTasks();
        periodicTasks2.shutdownTasks(tenantLink.linkId(), "link paused", periodicTasks2.shutdownTasks$default$3());
        waitForTaskCount(0);
        periodicTasks().addTask(ClusterLinkCheckAvailabilityTaskType$.MODULE$, tenantLink, new ClusterLinkCheckLinkAvailability(() -> {
            return this.linkConfig();
        }, kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics(), linkManager(), 10, scheduler(), tenantLink, kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$time(), kafka$server$link$ClusterLinkBatchTasksTest$$quota()));
        periodicTasks().addTask(ClusterLinkAutoMirroringTaskType$.MODULE$, tenantLink, new ClusterLinkAutoCreateMirror(mockClientManager, kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager(), tenantLink, kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin(), kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics(), kafka$server$link$ClusterLinkBatchTasksTest$$time(), kafka$server$link$ClusterLinkBatchTasksTest$$quota()));
        waitForTaskCount(2);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(5000L);
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            int i3 = create.elem;
            Integer boxToInteger3 = BoxesRunTime.boxToInteger(i3);
            if ($anonfun$testPeriodicTasks$4(5, i3)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(5, create.elem);
        periodicTasks().shutdownAllTasks();
        waitForTaskCount(0);
    }

    @Test
    public void testTaskScheduling() {
        periodicTasks_$eq(new ClusterLinkPeriodicTasks(brokerConfig(), linkManager(), scheduler(), kafka$server$link$ClusterLinkBatchTasksTest$$time()));
        runOnce();
        addTasks(0, 0, 1);
        runOnce();
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().size());
        ClusterLinkTask.TaskInfo taskInfo = (ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink(0, 0).linkId(), ClusterLinkSyncTopicConfigsTaskType$.MODULE$)).get();
        Assertions.assertNull(taskInfo.future());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnce();
        waitForRunning(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        Assertions.assertNotNull(taskInfo.future());
        KafkaFuture future = taskInfo.future();
        Assertions.assertFalse(future.isDone());
        waitForAdminRequest(0, 0);
        runOnce();
        waitForRunning(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().size());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(2000L);
        runOnce();
        waitForRunning(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        KafkaFuture future2 = taskInfo.future();
        Assertions.assertNotEquals(future, future2);
        Assertions.assertFalse(future2.isDone());
        periodicTasks().shutdown();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.addTasks(0, 0, 1);
        });
        periodicTasks().shutdownAllTasks();
        Assertions.assertTrue(future2.isCancelled());
        waitForTaskCount(0);
    }

    @Test
    public void testSingleTenantTasks() {
        periodicTasks_$eq(new ClusterLinkPeriodicTasks(brokerConfig(), linkManager(), scheduler(), kafka$server$link$ClusterLinkBatchTasksTest$$time()));
        addTasks(0, 0, 3);
        verifyRun$1(Nil$.MODULE$);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1}));
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 1)}));
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 2}));
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1L);
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(0, 0)}));
        ClusterLinkTask.TaskInfo taskInfo = (ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink(0, 2).linkId(), ClusterLinkSyncTopicConfigsTaskType$.MODULE$)).get();
        taskInfo.lastRunMs_$eq(taskInfo.lastRunMs() - 5);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2}));
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1L);
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2)}));
        ClusterLinkTask.TaskInfo taskInfo2 = (ClusterLinkTask.TaskInfo) periodicTasks().subTask(new PeriodicTaskKey(tenantLink(0, 1).linkId(), ClusterLinkSyncTopicConfigsTaskType$.MODULE$)).get();
        taskInfo2.lastRunMs_$eq(taskInfo2.lastRunMs() - 5);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1}));
        addTasks(0, 3, 2);
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1)}));
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{2, 3}));
        ClusterLinkPeriodicTasks periodicTasks = periodicTasks();
        periodicTasks.shutdownTasks(tenantLink(0, 2).linkId(), "link deleted", periodicTasks.shutdownTasks$default$3());
        waitForTaskCount(4);
        verifyRun$1(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{3, 4}));
    }

    @Test
    public void testMultiTenantTasks() {
        IntRef create = IntRef.create(3);
        Mockito.when(brokerConfig().clusterLinkPeriodicTaskBatchSize()).thenAnswer(invocationOnMock -> {
            return BoxesRunTime.boxToInteger($anonfun$testMultiTenantTasks$1(create, invocationOnMock));
        });
        periodicTasks_$eq(new ClusterLinkPeriodicTasks(brokerConfig(), linkManager(), scheduler(), kafka$server$link$ClusterLinkBatchTasksTest$$time()));
        addTasks(0, 0, 5);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2)}));
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2)}));
        addTasks(1, 0, 5);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0)}));
        addTasks(2, 0, 5);
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(2, 0)}));
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(2, 0)}));
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(2, 1)}));
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(2, 1)}));
        create.elem = 5;
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(2, 1), new Tuple2.mcII.sp(1, 1)}));
        create.elem = 6;
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 1), new Tuple2.mcII.sp(0, 2), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(2, 1), new Tuple2.mcII.sp(1, 1), new Tuple2.mcII.sp(2, 2)}));
    }

    @Test
    public void testTenantsWithNoReadyTasks() {
        int i = 5;
        Mockito.when(brokerConfig().clusterLinkPeriodicTaskBatchSize()).thenAnswer(invocationOnMock -> {
            return BoxesRunTime.boxToInteger($anonfun$testTenantsWithNoReadyTasks$1(i, invocationOnMock));
        });
        periodicTasks_$eq(new ClusterLinkPeriodicTasks(brokerConfig(), linkManager(), scheduler(), kafka$server$link$ClusterLinkBatchTasksTest$$time()));
        addTasks(0, 0, 1);
        runOnceAndWait(Nil$.MODULE$);
        Assertions.assertEquals(0L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        Assertions.assertEquals(0L, periodicTasks().maxTaskWaitMs());
        addTasks(1, 0, 5);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0)}));
        Assertions.assertEquals(0L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(1, 1), new Tuple2.mcII.sp(1, 2), new Tuple2.mcII.sp(1, 3)}));
        Assertions.assertEquals(1001L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(1, 1), new Tuple2.mcII.sp(1, 2), new Tuple2.mcII.sp(1, 3)}));
        Assertions.assertEquals(2001L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1L);
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(1, 1), new Tuple2.mcII.sp(1, 2)}));
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(1, 0), new Tuple2.mcII.sp(1, 3), new Tuple2.mcII.sp(1, 4)}));
        Assertions.assertEquals(0L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1L);
        completeTopicConfigSyncTasks(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(1, 0)}));
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(1, 3), new Tuple2.mcII.sp(1, 4)}));
        Assertions.assertEquals(0L, periodicTasks().maxTaskWaitMs());
        kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1000L);
        runOnceAndWait(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcII.sp(0, 0), new Tuple2.mcII.sp(1, 1), new Tuple2.mcII.sp(1, 2), new Tuple2.mcII.sp(1, 3), new Tuple2.mcII.sp(1, 4)}));
        Assertions.assertEquals(1000L, periodicTasks().maxTaskWaitMs());
    }

    private ClusterLinkConfig clusterLinkConfig(boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        properties.put(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), Boolean.toString(z));
        if (z) {
            properties.put(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), ClusterLinkTestUtils$.MODULE$.AllGroupsFilter());
        }
        properties.put(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "1000");
        properties.put(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        properties.put(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), "{\"topicFilters\":[{\"name\":\"*\",\"filterType\":\"INCLUDE\",\"patternType\":\"LITERAL\"}]}");
        properties.put("metadata.max.age.ms", "2000");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        return clusterLinkConfig$.create(properties, none$, true);
    }

    private boolean clusterLinkConfig$default$1() {
        return true;
    }

    private ClusterLinkDestClientManager mockClientManager(ClusterLinkData clusterLinkData) {
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) Mockito.mock(ClusterLinkDestClientManager.class);
        Mockito.when(clusterLinkDestClientManager.scheduler()).thenReturn(scheduler());
        Mockito.when(clusterLinkDestClientManager.metadataManager()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager());
        Mockito.when(clusterLinkDestClientManager.localAdminFactory()).thenReturn(() -> {
            return this.localAdmin();
        });
        Mockito.when(clusterLinkDestClientManager.remoteBatchAdmin()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin());
        Mockito.when(clusterLinkDestClientManager.localBatchAdmin()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin());
        Mockito.when(clusterLinkDestClientManager.metrics()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics());
        Mockito.when(clusterLinkDestClientManager.time()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$time());
        Mockito.when(clusterLinkDestClientManager.linkData()).thenReturn(clusterLinkData);
        Mockito.when(clusterLinkDestClientManager.alterConfigPolicy()).thenReturn(None$.MODULE$);
        Mockito.when(clusterLinkDestClientManager.currentConfig()).thenReturn(linkConfig());
        Mockito.when(clusterLinkDestClientManager.topicConfigSyncRules()).thenReturn(linkConfig().topicConfigSyncRules());
        Mockito.when(clusterLinkDestClientManager.requestQuota()).thenReturn(kafka$server$link$ClusterLinkBatchTasksTest$$quota());
        return clusterLinkDestClientManager;
    }

    private ClusterLinkData tenantLink(int i, int i2) {
        return (ClusterLinkData) ((SeqOps) tenantLinks().apply(BoxesRunTime.boxToInteger(i))).apply(i2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addTasks(int i, int i2, int i3) {
        int subTaskCount = periodicTasks().subTaskCount();
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(i2), i2 + i3).foreach$mVc$sp(i4 -> {
            this.periodicTasks().addTask(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, this.tenantLink(i, i4), this.topicConfigSyncTask(i, i4));
            this.kafka$server$link$ClusterLinkBatchTasksTest$$time().sleep(1L);
        });
        waitForTaskCount(subTaskCount + i3);
    }

    private void runOnce() {
        periodicTasks().runOnce().get(15L, TimeUnit.SECONDS);
    }

    private void runOnceAndWait(Seq<Tuple2<Object, Object>> seq) {
        runOnce();
        waitForRunning(seq);
    }

    private void waitForRunning(Seq<Tuple2<Object, Object>> seq) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            scala.collection.immutable.Set $anonfun$waitForRunning$1 = $anonfun$waitForRunning$1(this);
            if ($anonfun$waitForRunning$2(seq, $anonfun$waitForRunning$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRunning$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRunning$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(seq.toSet(), (scala.collection.immutable.Set) tuple2._1());
        seq.foreach(tuple22 -> {
            Tuple2 $minus$greater$extension2;
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            int _1$mcI$sp = tuple22._1$mcI$sp();
            int _2$mcI$sp = tuple22._2$mcI$sp();
            this.waitForAdminRequest(_1$mcI$sp, _2$mcI$sp);
            ClusterLinkTask.TaskInfo taskInfo = (ClusterLinkTask.TaskInfo) this.periodicTasks().subTask(new PeriodicTaskKey(this.tenantLink(_1$mcI$sp, _2$mcI$sp).linkId(), ClusterLinkSyncTopicConfigsTaskType$.MODULE$)).get();
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            TestUtils$ testUtils$5 = TestUtils$.MODULE$;
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (true) {
                KafkaFuture future = taskInfo.future();
                if ($anonfun$waitForRunning$5(future)) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(future), BoxesRunTime.boxToBoolean(true));
                    break;
                }
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(future), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            Tuple2 tuple22 = $minus$greater$extension2;
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            KafkaFuture kafkaFuture = (KafkaFuture) tuple22._1();
            Assertions.assertTrue(tuple22._2$mcZ$sp());
            return kafkaFuture.whenComplete((taskResult, th) -> {
                this.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().$minus$eq(new Tuple2.mcII.sp(_1$mcI$sp, _2$mcI$sp));
            });
        });
    }

    private ClusterLinkSyncTopicsConfigs topicConfigSyncTask(final int i, final int i2) {
        final ClusterLinkData tenantLink = tenantLink(i, i2);
        final ClusterLinkDestClientManager mockClientManager = mockClientManager(tenantLink);
        String sb = new StringBuilder(5).append(tenantLink.linkName()).append("Topic").toString();
        Mockito.when(mockClientManager.getTopics()).thenReturn(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{sb})));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, sb);
        Mockito.when(remoteAdmin().describeConfigs((Collection) ArgumentMatchers.eq(Collections.singleton(configResource)), (DescribeConfigsOptions) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
            this.taskFutures().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tenantLink), kafkaFutureImpl));
            return new DescribeConfigsResult(Collections.singletonMap(configResource, kafkaFutureImpl));
        });
        return new ClusterLinkSyncTopicsConfigs(this, mockClientManager, tenantLink, i, i2) { // from class: kafka.server.link.ClusterLinkBatchTasksTest$$anon$1
            private final /* synthetic */ ClusterLinkBatchTasksTest $outer;
            private final int tenantIndex$3;
            private final int linkIndex$2;

            public ClusterLinkScheduler.TaskResult run() {
                Tuple2.mcII.sp spVar = new Tuple2.mcII.sp(this.tenantIndex$3, this.linkIndex$2);
                Assertions.assertFalse(this.$outer.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().contains(spVar));
                this.$outer.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().$plus$eq(spVar);
                return super.run();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.tenantIndex$3 = i;
                this.linkIndex$2 = i2;
                ClusterLinkMetadataManager kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager = this.kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager();
                ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin = this.kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin();
                ClusterLinkNonBatchingAdmin kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin = this.kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin();
                ClusterLinkMetrics kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics = this.kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics();
                Time kafka$server$link$ClusterLinkBatchTasksTest$$time = this.kafka$server$link$ClusterLinkBatchTasksTest$$time();
                ClusterLinkRequestQuota kafka$server$link$ClusterLinkBatchTasksTest$$quota = this.kafka$server$link$ClusterLinkBatchTasksTest$$quota();
            }
        };
    }

    private void completeTopicConfigSyncTasks(Seq<Tuple2<Object, Object>> seq) {
        seq.foreach(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$completeTopicConfigSyncTasks$1(this, tuple2));
        });
        runOnce();
        seq.foreach(tuple22 -> {
            $anonfun$completeTopicConfigSyncTasks$2(this, tuple22);
            return BoxedUnit.UNIT;
        });
    }

    private KafkaFutureImpl<Config> waitForAdminRequest(int i, int i2) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Option $anonfun$waitForAdminRequest$1 = $anonfun$waitForAdminRequest$1(this, i, i2);
            if ($anonfun$waitForAdminRequest$1.nonEmpty()) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForAdminRequest$1), BoxesRunTime.boxToBoolean(true));
                break;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForAdminRequest$1), BoxesRunTime.boxToBoolean(false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) tuple2._1();
        Assertions.assertNotEquals(None$.MODULE$, option);
        return (KafkaFutureImpl) option.get();
    }

    private void waitForTaskCount(int i) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$waitForTaskCount$1 = $anonfun$waitForTaskCount$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$waitForTaskCount$1);
            if ($anonfun$waitForTaskCount$2(i, $anonfun$waitForTaskCount$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(i, tuple2._1$mcI$sp());
    }

    public static final /* synthetic */ ClusterLinkData $anonfun$tenantLinks$2(int i, int i2) {
        return new ClusterLinkData(new StringBuilder(13).append(i).append("{tenant}_link").append(i2).toString(), Uuid.randomUuid(), None$.MODULE$, new Some(new StringBuilder(1).append(i).append("_").toString()), false);
    }

    public static final /* synthetic */ Tuple2 $anonfun$tenantLinks$1(int i) {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(i)), RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).map(obj -> {
            return $anonfun$tenantLinks$2(i, BoxesRunTime.unboxToInt(obj));
        }));
    }

    public static final /* synthetic */ boolean $anonfun$testPeriodicTasks$4(int i, int i2) {
        return i2 == i;
    }

    private static final void waitForPeriodicTaskRequests$1(int i, IntRef intRef) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int i2 = intRef.elem;
            Integer boxToInteger = BoxesRunTime.boxToInteger(i2);
            if ($anonfun$testPeriodicTasks$4(i, i2)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(i, intRef.elem);
    }

    public static final /* synthetic */ Tuple2 $anonfun$testSingleTenantTasks$1(int i) {
        return new Tuple2.mcII.sp(0, i);
    }

    private final void verifyRun$1(Seq seq) {
        runOnceAndWait((Seq) seq.map(obj -> {
            return $anonfun$testSingleTenantTasks$1(BoxesRunTime.unboxToInt(obj));
        }));
    }

    public static final /* synthetic */ int $anonfun$testMultiTenantTasks$1(IntRef intRef, InvocationOnMock invocationOnMock) {
        return intRef.elem;
    }

    public static final /* synthetic */ int $anonfun$testTenantsWithNoReadyTasks$1(int i, InvocationOnMock invocationOnMock) {
        return i;
    }

    public static final /* synthetic */ scala.collection.immutable.Set $anonfun$waitForRunning$1(ClusterLinkBatchTasksTest clusterLinkBatchTasksTest) {
        return clusterLinkBatchTasksTest.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks().toSet();
    }

    public static final /* synthetic */ boolean $anonfun$waitForRunning$2(Seq seq, scala.collection.immutable.Set set) {
        scala.collection.immutable.Set set2 = seq.toSet();
        return set == null ? set2 == null : set.equals(set2);
    }

    public static final /* synthetic */ boolean $anonfun$waitForRunning$5(KafkaFuture kafkaFuture) {
        return (kafkaFuture == null || kafkaFuture.isDone()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$completeTopicConfigSyncTasks$1(ClusterLinkBatchTasksTest clusterLinkBatchTasksTest, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        return ((KafkaFutureImpl) clusterLinkBatchTasksTest.taskFutures().apply(clusterLinkBatchTasksTest.tenantLink(tuple2._1$mcI$sp(), tuple2._2$mcI$sp()))).complete(new Config(Collections.emptySet()));
    }

    public static final /* synthetic */ void $anonfun$completeTopicConfigSyncTasks$2(ClusterLinkBatchTasksTest clusterLinkBatchTasksTest, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertTrue(((ClusterLinkScheduler.TaskResult) ((ClusterLinkTask.TaskInfo) clusterLinkBatchTasksTest.periodicTasks().subTask(new PeriodicTaskKey(clusterLinkBatchTasksTest.tenantLink(tuple2._1$mcI$sp(), tuple2._2$mcI$sp()).linkId(), ClusterLinkSyncTopicConfigsTaskType$.MODULE$)).get()).future().get(15L, TimeUnit.SECONDS)).completed());
    }

    public static final /* synthetic */ Option $anonfun$waitForAdminRequest$1(ClusterLinkBatchTasksTest clusterLinkBatchTasksTest, int i, int i2) {
        return clusterLinkBatchTasksTest.taskFutures().get(clusterLinkBatchTasksTest.tenantLink(i, i2));
    }

    public static final /* synthetic */ int $anonfun$waitForTaskCount$1(ClusterLinkBatchTasksTest clusterLinkBatchTasksTest) {
        return clusterLinkBatchTasksTest.periodicTasks().subTaskCount();
    }

    public static final /* synthetic */ boolean $anonfun$waitForTaskCount$2(int i, int i2) {
        return i2 == i;
    }

    public ClusterLinkBatchTasksTest() {
        ClusterLinkScheduler$ clusterLinkScheduler$ = ClusterLinkScheduler$.MODULE$;
        ClusterLinkScheduler$ clusterLinkScheduler$2 = ClusterLinkScheduler$.MODULE$;
        this.scheduler = new ClusterLinkScheduler(0, 100);
        this.localAdmin = (ClusterLinkLocalAdmin) Mockito.mock(ClusterLinkLocalAdmin.class);
        this.remoteAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        this.kafka$server$link$ClusterLinkBatchTasksTest$$remoteBatchAdmin = new ClusterLinkNonBatchingAdmin(() -> {
            return this.remoteAdmin();
        });
        this.kafka$server$link$ClusterLinkBatchTasksTest$$localBatchAdmin = new ClusterLinkNonBatchingAdmin(() -> {
            return this.localAdmin();
        });
        this.linkManager = (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class);
        this.kafka$server$link$ClusterLinkBatchTasksTest$$linkMetrics = new ClusterLinkMetrics("", Uuid.randomUuid(), ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, ConnectionMode$Inbound$.MODULE$, false, (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class), None$.MODULE$, new Metrics(), None$.MODULE$, false);
        this.kafka$server$link$ClusterLinkBatchTasksTest$$metadataManager = (ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class);
        this.kafka$server$link$ClusterLinkBatchTasksTest$$time = new MockTime(0L);
        this.kafka$server$link$ClusterLinkBatchTasksTest$$quota = (ClusterLinkRequestQuota) Mockito.mock(ClusterLinkRequestQuota.class);
        this.brokerConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        this.linkConfig = clusterLinkConfig(true);
        this.tenantLinks = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 3).map(obj -> {
            return $anonfun$tenantLinks$1(BoxesRunTime.unboxToInt(obj));
        }).toMap($less$colon$less$.MODULE$.refl());
        this.kafka$server$link$ClusterLinkBatchTasksTest$$runningTasks = (Set) Set$.MODULE$.empty();
        this.taskFutures = (scala.collection.mutable.Map) Map$.MODULE$.empty();
    }
}
