package kafka.server.link;

import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.LogCaptureAppender;
import kafka.utils.LogCaptureAppender$;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkDestConnectionManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t5g\u0001\u0002\u001e<\u0001\tCQ!\u0013\u0001\u0005\u0002)Cq!\u0014\u0001C\u0002\u0013%a\n\u0003\u0004T\u0001\u0001\u0006Ia\u0014\u0005\b)\u0002\u0011\r\u0011\"\u0003V\u0011\u0019q\u0006\u0001)A\u0005-\"9q\f\u0001b\u0001\n\u0013\u0001\u0007BB4\u0001A\u0003%\u0011\rC\u0004i\u0001\t\u0007I\u0011B+\t\r%\u0004\u0001\u0015!\u0003W\u0011\u001dQ\u0007A1A\u0005\nUCaa\u001b\u0001!\u0002\u00131\u0006b\u00027\u0001\u0005\u0004%I!\u001c\u0005\u0007i\u0002\u0001\u000b\u0011\u00028\t\u000fU\u0004!\u0019!C\u0005m\"1!\u0010\u0001Q\u0001\n]Dqa\u001f\u0001C\u0002\u0013%A\u0010C\u0004\u0002\u0006\u0001\u0001\u000b\u0011B?\t\u0013\u0005\u001d\u0001A1A\u0005\n\u0005%\u0001\u0002CA\t\u0001\u0001\u0006I!a\u0003\t\u0013\u0005M\u0001A1A\u0005\n\u0005U\u0001\u0002CA\u0017\u0001\u0001\u0006I!a\u0006\t\u0013\u0005=\u0002A1A\u0005\n\u0005E\u0002\u0002CA\"\u0001\u0001\u0006I!a\r\t\u0013\u0005\u0015\u0003A1A\u0005\n\u0005\u001d\u0003\u0002CA,\u0001\u0001\u0006I!!\u0013\t\u0013\u0005e\u0003A1A\u0005\n\u0005m\u0003\u0002CA:\u0001\u0001\u0006I!!\u0018\t\u0017\u0005U\u0004\u00011AA\u0002\u0013%\u0011q\u000f\u0005\f\u0003\u007f\u0002\u0001\u0019!a\u0001\n\u0013\t\t\tC\u0006\u0002\u000e\u0002\u0001\r\u0011!Q!\n\u0005e\u0004bCAH\u0001\u0001\u0007\t\u0019!C\u0005\u0003#C1\"!'\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u001c\"Y\u0011q\u0014\u0001A\u0002\u0003\u0005\u000b\u0015BAJ\u0011-\t\t\u000b\u0001a\u0001\u0002\u0004%I!a)\t\u0017\u0005-\u0006\u00011AA\u0002\u0013%\u0011Q\u0016\u0005\f\u0003c\u0003\u0001\u0019!A!B\u0013\t)\u000bC\u0006\u00024\u0002\u0001\r\u00111A\u0005\n\u0005U\u0006bCA\\\u0001\u0001\u0007\t\u0019!C\u0005\u0003sC1\"!0\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002n!Y\u0011q\u0018\u0001A\u0002\u0003\u0007I\u0011BAa\u0011-\tY\r\u0001a\u0001\u0002\u0004%I!!4\t\u0017\u0005E\u0007\u00011A\u0001B\u0003&\u00111\u0019\u0005\b\u0003'\u0004A\u0011AAk\u0011\u001d\ti\u000f\u0001C\u0001\u0003+Dq!a>\u0001\t\u0003\t)\u000eC\u0004\u0003\u0002\u0001!\t!!6\t\u000f\t\u0015\u0001\u0001\"\u0001\u0002V\"9!\u0011\u0002\u0001\u0005\u0002\u0005U\u0007b\u0002B\u0007\u0001\u0011\u0005\u0011Q\u001b\u0005\b\u0005#\u0001A\u0011AAk\u0011\u001d\u0011)\u0002\u0001C\u0005\u0005/AqA!\u001e\u0001\t\u0013\u00119\bC\u0004\u0003z\u0001!IAa\u001f\t\u000f\t\u001d\u0005\u0001\"\u0003\u0003\n\"9!1\u0012\u0001\u0005\n\t5\u0005b\u0002BL\u0001\u0011%!\u0011\u0014\u0005\n\u0005k\u0003\u0011\u0013!C\u0005\u0005o\u0013Ae\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"p]:,7\r^5p]6\u000bg.Y4feR+7\u000f\u001e\u0006\u0003yu\nA\u0001\\5oW*\u0011ahP\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0001\u000bQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001\u0007B\u0011AiR\u0007\u0002\u000b*\ta)A\u0003tG\u0006d\u0017-\u0003\u0002I\u000b\n1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A&\u0011\u00051\u0003Q\"A\u001e\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0016\u0003=\u0003\"\u0001U)\u000e\u0003uJ!AU\u001f\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u000eEJ|7.\u001a:D_:4\u0017n\u001a\u0011\u0002\u00111Lgn\u001b(b[\u0016,\u0012A\u0016\t\u0003/rk\u0011\u0001\u0017\u0006\u00033j\u000bA\u0001\\1oO*\t1,\u0001\u0003kCZ\f\u0017BA/Y\u0005\u0019\u0019FO]5oO\u0006IA.\u001b8l\u001d\u0006lW\rI\u0001\u0007Y&t7.\u00133\u0016\u0003\u0005\u0004\"AY3\u000e\u0003\rT!\u0001\u001a.\u0002\tU$\u0018\u000e\\\u0005\u0003M\u000e\u0014A!V+J\t\u00069A.\u001b8l\u0013\u0012\u0004\u0013aD:pkJ\u001cWm\u00117vgR,'/\u00133\u0002!M|WO]2f\u00072,8\u000f^3s\u0013\u0012\u0004\u0013!\u00043fgR\u001cE.^:uKJLE-\u0001\beKN$8\t\\;ti\u0016\u0014\u0018\n\u001a\u0011\u0002\u00111Lgn\u001b#bi\u0006,\u0012A\u001c\t\u0003_Jl\u0011\u0001\u001d\u0006\u0003c~\n!A_6\n\u0005M\u0004(aD\"mkN$XM\u001d'j].$\u0015\r^1\u0002\u00131Lgn\u001b#bi\u0006\u0004\u0013!\u00037j].\u0004&o\u001c9t+\u00059\bC\u00012y\u0013\tI8M\u0001\u0006Qe>\u0004XM\u001d;jKN\f!\u0002\\5oWB\u0013x\u000e]:!\u0003)\u0019wN\u001c;s_2dWM]\u000b\u0002{B\u0019a0!\u0001\u000e\u0003}T!a_ \n\u0007\u0005\rqPA\bLC\u001a\\\u0017mQ8oiJ|G\u000e\\3s\u0003-\u0019wN\u001c;s_2dWM\u001d\u0011\u0002\u00171Lgn['b]\u0006<WM]\u000b\u0003\u0003\u0017\u00012\u0001TA\u0007\u0013\r\tya\u000f\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'/\u0001\u0007mS:\\W*\u00198bO\u0016\u0014\b%A\u0007oKR<xN]6DY&,g\u000e^\u000b\u0003\u0003/\u0001B!!\u0007\u0002*5\u0011\u00111\u0004\u0006\u0005\u0003;\ty\"A\u0004dY&,g\u000e^:\u000b\u0007\u0001\u000b\tC\u0003\u0003\u0002$\u0005\u0015\u0012AB1qC\u000eDWM\u0003\u0002\u0002(\u0005\u0019qN]4\n\t\u0005-\u00121\u0004\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0002\u001d9,Go^8sW\u000ec\u0017.\u001a8uA\u0005!\u0012\rZ7j]6+G/\u00193bi\u0006l\u0015M\\1hKJ,\"!a\r\u0011\t\u0005U\u0012qH\u0007\u0003\u0003oQA!!\u000f\u0002<\u0005I\u0011N\u001c;fe:\fGn\u001d\u0006\u0005\u0003{\tY\"A\u0003bI6Lg.\u0003\u0003\u0002B\u0005]\"\u0001F!e[&tW*\u001a;bI\u0006$\u0018-T1oC\u001e,'/A\u000bbI6Lg.T3uC\u0012\fG/Y'b]\u0006<WM\u001d\u0011\u0002\u000f5,GO]5dgV\u0011\u0011\u0011\n\t\u0005\u0003\u0017\n\u0019&\u0004\u0002\u0002N)!\u0011QIA(\u0015\u0011\t\t&a\b\u0002\r\r|W.\\8o\u0013\u0011\t)&!\u0014\u0003\u000f5+GO]5dg\u0006AQ.\u001a;sS\u000e\u001c\b%A\u0007dY>\u001cX\rZ\"mS\u0016tGo]\u000b\u0003\u0003;\u0002b!a\u0018\u0002j\u00055TBAA1\u0015\u0011\t\u0019'!\u001a\u0002\u000f5,H/\u00192mK*\u0019\u0011qM#\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002l\u0005\u0005$aC!se\u0006L()\u001e4gKJ\u00042\u0001TA8\u0013\r\t\th\u000f\u0002\u0017\u00072,8\u000f^3s\u0019&t7.\u00113nS:\u001cE.[3oi\u0006q1\r\\8tK\u0012\u001cE.[3oiN\u0004\u0013A\u00037j].\u001cuN\u001c4jOV\u0011\u0011\u0011\u0010\t\u0004\u0019\u0006m\u0014bAA?w\t\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002\u001d1Lgn[\"p]\u001aLwm\u0018\u0013fcR!\u00111QAE!\r!\u0015QQ\u0005\u0004\u0003\u000f+%\u0001B+oSRD\u0011\"a#\u001e\u0003\u0003\u0005\r!!\u001f\u0002\u0007a$\u0013'A\u0006mS:\\7i\u001c8gS\u001e\u0004\u0013a\u00037j].lU\r\u001e:jGN,\"!a%\u0011\u00071\u000b)*C\u0002\u0002\u0018n\u0012!c\u00117vgR,'\u000fT5oW6+GO]5dg\u0006yA.\u001b8l\u001b\u0016$(/[2t?\u0012*\u0017\u000f\u0006\u0003\u0002\u0004\u0006u\u0005\"CAFA\u0005\u0005\t\u0019AAJ\u00031a\u0017N\\6NKR\u0014\u0018nY:!\u0003-\u0019wN\u001c8NC:\fw-\u001a:\u0016\u0005\u0005\u0015\u0006c\u0001'\u0002(&\u0019\u0011\u0011V\u001e\u0003A\rcWo\u001d;fe2Kgn\u001b#fgR\u001cuN\u001c8fGRLwN\\'b]\u0006<WM]\u0001\u0010G>tg.T1oC\u001e,'o\u0018\u0013fcR!\u00111QAX\u0011%\tYiIA\u0001\u0002\u0004\t)+\u0001\u0007d_:tW*\u00198bO\u0016\u0014\b%A\u0006sK6|G/Z!e[&tWCAA7\u0003=\u0011X-\\8uK\u0006#W.\u001b8`I\u0015\fH\u0003BAB\u0003wC\u0011\"a#'\u0003\u0003\u0005\r!!\u001c\u0002\u0019I,Wn\u001c;f\u0003\u0012l\u0017N\u001c\u0011\u0002\u00151|7-\u00197BI6Lg.\u0006\u0002\u0002DB!\u0011QYAd\u001b\t\tY$\u0003\u0003\u0002J\u0006m\"AD\"p]\u001adW/\u001a8u\u0003\u0012l\u0017N\\\u0001\u000fY>\u001c\u0017\r\\!e[&tw\fJ3r)\u0011\t\u0019)a4\t\u0013\u0005-\u0015&!AA\u0002\u0005\r\u0017a\u00037pG\u0006d\u0017\tZ7j]\u0002\nQa]3u+B$\"!a!)\u0007-\nI\u000e\u0005\u0003\u0002\\\u0006%XBAAo\u0015\u0011\ty.!9\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0002d\u0006\u0015\u0018a\u00026va&$XM\u001d\u0006\u0005\u0003O\f)#A\u0003kk:LG/\u0003\u0003\u0002l\u0006u'A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000eK\u0002-\u0003c\u0004B!a7\u0002t&!\u0011Q_Ao\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u000euKN$8i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,w*\u001e;c_VtG\rK\u0002.\u0003w\u0004B!a7\u0002~&!\u0011q`Ao\u0005\u0011!Vm\u001d;\u00023Q,7\u000f^\"p]:,7\r^5p]6{G-Z%oE>,h\u000e\u001a\u0015\u0004]\u0005m\u0018a\b;fgR\u0004VM]:jgR,g\u000e^\"p]:,7\r^5p]\u001a\u000b\u0017\u000e\\;sK\"\u001aq&a?\u0002\u001fQ,7\u000f\u001e*fG>tg-[4ve\u0016D3\u0001MA~\u0003%!Xm\u001d;QCV\u001cX\rK\u00022\u0003w\fA\u0002^3ti2{w\rT3wK2D3AMA~\u000391XM]5gs2{w\rT3wK2$\u0002\"a!\u0003\u001a\t-#1\f\u0005\b\u00057\u0019\u0004\u0019\u0001B\u000f\u0003\u0015\u0019G.\u0019>{a\u0011\u0011yB!\u000f\u0011\r\t\u0005\"q\u0006B\u001b\u001d\u0011\u0011\u0019Ca\u000b\u0011\u0007\t\u0015R)\u0004\u0002\u0003()\u0019!\u0011F!\u0002\rq\u0012xn\u001c;?\u0013\r\u0011i#R\u0001\u0007!J,G-\u001a4\n\t\tE\"1\u0007\u0002\u0006\u00072\f7o\u001d\u0006\u0004\u0005[)\u0005\u0003\u0002B\u001c\u0005sa\u0001\u0001\u0002\u0007\u0003<\te\u0011\u0011!A\u0001\u0006\u0003\u0011iDA\u0002`IE\nBAa\u0010\u0003FA\u0019AI!\u0011\n\u0007\t\rSIA\u0004O_RD\u0017N\\4\u0011\u0007\u0011\u00139%C\u0002\u0003J\u0015\u00131!\u00118z\u0011\u001d\u0011ie\ra\u0001\u0005\u001f\n\u0001\"\u00199qK:$WM\u001d\t\u0005\u0005#\u00129&\u0004\u0002\u0003T)\u0019!QK \u0002\u000bU$\u0018\u000e\\:\n\t\te#1\u000b\u0002\u0013\u0019><7)\u00199ukJ,\u0017\t\u001d9f]\u0012,'\u000fC\u0004\u0003^M\u0002\rAa\u0018\u0002\u00175\f\u0007\u0010T8h\u0019\u00164X\r\u001c\t\u0006\t\n\u0005$QM\u0005\u0004\u0005G*%AB(qi&|g\u000e\u0005\u0003\u0003h\tETB\u0001B5\u0015\u0011\u0011YG!\u001c\u0002\u000b\u00154XM\u001c;\u000b\t\t=\u0014QE\u0001\u0006g24GG[\u0005\u0005\u0005g\u0012IGA\u0003MKZ,G.\u0001\nde\u0016\fG/\u001a\"s_.,'oQ8oM&<G#A(\u0002-M,G/\u001e9D_:tWm\u0019;j_:l\u0015M\\1hKJ$B!a!\u0003~!9!qP\u001bA\u0002\t\u0005\u0015AD2p]:,7\r^5p]6{G-\u001a\t\u0004\u0019\n\r\u0015b\u0001BCw\tq1i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0017\u0001E2sK\u0006$X\rT8dC2\fE-\\5o)\t\t\u0019-A\tde\u0016\fG/\u001a*f[>$X-\u00113nS:$b!!\u001c\u0003\u0010\nM\u0005b\u0002BIo\u0001\u0007\u0011\u0011P\u0001\u0007G>tg-[4\t\u000f\tUu\u00071\u0001\u0002&\u00069Q.\u00198bO\u0016\u0014\u0018a\u0003:fm\u0016\u00148/\u001a(pI\u0016$bAa'\u0003(\nE\u0006\u0003\u0002BO\u0005Gk!Aa(\u000b\t\t\u0005\u0016qJ\u0001\b]\u0016$xo\u001c:l\u0013\u0011\u0011)Ka(\u0003\u0017I+g/\u001a:tK:{G-\u001a\u0005\b\u0005SC\u0004\u0019\u0001BV\u0003\u0019qw\u000eZ3JIB\u0019AI!,\n\u0007\t=VIA\u0002J]RD\u0011Ba-9!\u0003\u0005\rAa+\u0002\u0013I,\u0017/^3ti&#\u0017!\u0006:fm\u0016\u00148/\u001a(pI\u0016$C-\u001a4bk2$HEM\u000b\u0003\u0005sSCAa+\u0003<.\u0012!Q\u0018\t\u0005\u0005\u007f\u0013I-\u0004\u0002\u0003B*!!1\u0019Bc\u0003%)hn\u00195fG.,GMC\u0002\u0003H\u0016\u000b!\"\u00198o_R\fG/[8o\u0013\u0011\u0011YM!1\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
/* loaded from: input_file:kafka/server/link/ClusterLinkDestConnectionManagerTest.class */
public class ClusterLinkDestConnectionManagerTest {
    private final KafkaConfig brokerConfig = createBrokerConfig();
    private final String linkName = "testLink";
    private final UUID linkId = UUID.randomUUID();
    private final String sourceClusterId = "sourceCluster";
    private final String destClusterId = "destCluster";
    private final ClusterLinkData linkData = new ClusterLinkData(linkName(), linkId(), new Some(sourceClusterId()), None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final KafkaController controller = (KafkaController) EasyMock.createNiceMock(KafkaController.class);
    private final ClusterLinkManager linkManager = (ClusterLinkManager) EasyMock.createNiceMock(ClusterLinkManager.class);
    private final NetworkClient networkClient = (NetworkClient) EasyMock.createNiceMock(NetworkClient.class);
    private final AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 100, 10000);
    private final Metrics metrics = new Metrics();
    private final ArrayBuffer<ClusterLinkAdminClient> closedClients = ArrayBuffer$.MODULE$.empty();
    private ClusterLinkConfig linkConfig;
    private ClusterLinkMetrics linkMetrics;
    private ClusterLinkDestConnectionManager connManager;
    private ClusterLinkAdminClient remoteAdmin;
    private ConfluentAdmin kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private String linkName() {
        return this.linkName;
    }

    private UUID linkId() {
        return this.linkId;
    }

    private String sourceClusterId() {
        return this.sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    private ClusterLinkData linkData() {
        return this.linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    private KafkaController controller() {
        return this.controller;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    private AdminMetadataManager adminMetadataManager() {
        return this.adminMetadataManager;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private ArrayBuffer<ClusterLinkAdminClient> closedClients() {
        return this.closedClients;
    }

    private ClusterLinkConfig linkConfig() {
        return this.linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.linkConfig = clusterLinkConfig;
    }

    private ClusterLinkMetrics linkMetrics() {
        return this.linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics clusterLinkMetrics) {
        this.linkMetrics = clusterLinkMetrics;
    }

    private ClusterLinkDestConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkDestConnectionManager clusterLinkDestConnectionManager) {
        this.connManager = clusterLinkDestConnectionManager;
    }

    private ClusterLinkAdminClient remoteAdmin() {
        return this.remoteAdmin;
    }

    private void remoteAdmin_$eq(ClusterLinkAdminClient clusterLinkAdminClient) {
        this.remoteAdmin = clusterLinkAdminClient;
    }

    private ConfluentAdmin localAdmin() {
        return this.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin confluentAdmin) {
        this.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin = confluentAdmin;
    }

    @BeforeEach
    public void setUp() {
        EasyMock.expect(BoxesRunTime.boxToBoolean(controller().isActive())).andReturn(BoxesRunTime.boxToBoolean(true)).anyTimes();
        EasyMock.replay(new Object[]{controller()});
    }

    @AfterEach
    public void tearDown() {
        if (connManager() != null) {
            connManager().shutdown();
        }
        metrics().close();
    }

    @Test
    public void testConnectionModeOutbound() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$);
        Assertions.assertNull(remoteAdmin());
        connManager().startup();
        Assertions.assertNull(remoteAdmin());
        KafkaChannel kafkaChannel = (KafkaChannel) EasyMock.createNiceMock(KafkaChannel.class);
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, this.reverseNode(1, -1));
        });
        Assertions.assertNull(localAdmin(), "Local admin client created unnecessarily for outbound dest connection manager");
        connManager().shutdown();
    }

    @Test
    public void testConnectionModeInbound() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        Assertions.assertNull(remoteAdmin());
        connManager().startup();
        Assertions.assertNotNull(remoteAdmin());
        Assertions.assertTrue(connManager().reverseConnectionProvider(networkClient(), new Some(adminMetadataManager()), "").nonEmpty());
        KafkaChannel kafkaChannel = (KafkaChannel) EasyMock.createNiceMock(KafkaChannel.class);
        connManager().processReverseConnection(kafkaChannel, reverseNode(1, -1));
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
        ReverseNode reverseNode = reverseNode(2, 5);
        Assertions.assertThrows(NetworkException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, reverseNode);
        });
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
        Assertions.assertNotNull(localAdmin(), "Local admin client not created for inbound dest connection manager");
        connManager().shutdown();
        Assertions.assertNull(localAdmin(), "Local admin client was not shutdown");
    }

    @Test
    public void testPersistentConnectionFailure() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        KafkaChannel kafkaChannel = (KafkaChannel) EasyMock.createNiceMock(KafkaChannel.class);
        kafkaChannel.close();
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.replay(new Object[]{kafkaChannel});
        ReverseNode reverseNode = reverseNode(1, -1);
        networkClient().reverseAndAdd((ReverseChannel) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new RuntimeException("Test exception")).once();
        EasyMock.replay(new Object[]{networkClient()});
        Assertions.assertThrows(RuntimeException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, reverseNode);
        });
    }

    @Test
    public void testReconfigure() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        ConfluentAdmin localAdmin = localAdmin();
        ClusterLinkAdminClient remoteAdmin = remoteAdmin();
        linkProps().setProperty("metadata.max.age.ms", "1000");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals(connManager().currentConfig().originals(), linkConfig().originals());
        Assertions.assertNotNull(remoteAdmin());
        Assertions.assertNotSame(remoteAdmin, remoteAdmin());
        Assertions.assertEquals(new $colon.colon(remoteAdmin, Nil$.MODULE$), closedClients().toSeq());
        Assertions.assertSame(localAdmin, localAdmin());
    }

    @Test
    public void testPause() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        ClusterLinkAdminClient remoteAdmin = remoteAdmin();
        linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals(connManager().currentConfig().originals(), linkConfig().originals());
        Assertions.assertEquals(new $colon.colon(remoteAdmin, Nil$.MODULE$), closedClients().toSeq());
        Assertions.assertNull(localAdmin(), "Local admin client was not shutdown");
        Assertions.assertNull(remoteAdmin(), "Remote admin client was not shutdown");
        linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        ClusterLinkConfig$ clusterLinkConfig$3 = ClusterLinkConfig$.MODULE$;
        Properties linkProps2 = linkProps();
        None$ none$2 = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$4 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$3.create(linkProps2, none$2, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertNotNull(localAdmin(), "Local admin client was not recreated");
        Assertions.assertNotNull(remoteAdmin(), "Remote admin client was not recreated");
    }

    @Test
    public void testLogLevel() {
        Class<?> cls = ClusterLinkDestConnectionManager.class;
        Level classLoggerLevel = LogCaptureAppender$.MODULE$.setClassLoggerLevel(ClusterLinkDestConnectionManager.class, Level.ALL);
        LogCaptureAppender createAndRegister = LogCaptureAppender$.MODULE$.createAndRegister();
        try {
            verifyLogLevel(ClusterLinkDestConnectionManager.class, createAndRegister, None$.MODULE$);
            verifyLogLevel(ClusterLinkDestConnectionManager.class, createAndRegister, new Some((Object) null));
            ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps(org.slf4j.event.Level.values()), level -> {
                $anonfun$testLogLevel$1(this, cls, createAndRegister, level);
                return BoxedUnit.UNIT;
            });
        } finally {
            LogCaptureAppender$.MODULE$.unregister(createAndRegister);
            LogCaptureAppender$.MODULE$.setClassLoggerLevel(ClusterLinkDestConnectionManager.class, classLoggerLevel);
            createAndRegister.close();
        }
    }

    private void verifyLogLevel(Class<?> cls, LogCaptureAppender logCaptureAppender, Option<org.slf4j.event.Level> option) {
        LogContext logContext;
        org.slf4j.event.Level level;
        AtomicReference atomicReference = new AtomicReference();
        if (option instanceof Some) {
            logContext = new LogContext("[TEST] ", atomicReference);
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            logContext = new LogContext("[TEST] ");
        }
        Logger logger = logContext.logger(cls);
        boolean z = false;
        Some some = null;
        if (option instanceof Some) {
            z = true;
            some = (Some) option;
            if (((org.slf4j.event.Level) some.value()) == null) {
                level = org.slf4j.event.Level.ERROR;
                org.slf4j.event.Level level2 = level;
                option.foreach(level3 -> {
                    atomicReference.set(level3);
                    return BoxedUnit.UNIT;
                });
                logger.trace("trace message");
                assertLastLog$1(org.slf4j.event.Level.TRACE, "[TEST] trace message", logCaptureAppender, level2);
                logger.debug("debug message");
                assertLastLog$1(org.slf4j.event.Level.DEBUG, "[TEST] debug message", logCaptureAppender, level2);
                logger.info("info message");
                assertLastLog$1(org.slf4j.event.Level.INFO, "[TEST] info message", logCaptureAppender, level2);
                logger.warn("warn message");
                assertLastLog$1(org.slf4j.event.Level.WARN, "[TEST] warn message", logCaptureAppender, level2);
                logger.error("error message");
                assertLastLog$1(org.slf4j.event.Level.ERROR, "[TEST] error message", logCaptureAppender, level2);
            }
        }
        if (z) {
            level = (org.slf4j.event.Level) some.value();
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            level = org.slf4j.event.Level.ERROR;
        }
        org.slf4j.event.Level level22 = level;
        option.foreach(level32 -> {
            atomicReference.set(level32);
            return BoxedUnit.UNIT;
        });
        logger.trace("trace message");
        assertLastLog$1(org.slf4j.event.Level.TRACE, "[TEST] trace message", logCaptureAppender, level22);
        logger.debug("debug message");
        assertLastLog$1(org.slf4j.event.Level.DEBUG, "[TEST] debug message", logCaptureAppender, level22);
        logger.info("info message");
        assertLastLog$1(org.slf4j.event.Level.INFO, "[TEST] info message", logCaptureAppender, level22);
        logger.warn("warn message");
        assertLastLog$1(org.slf4j.event.Level.WARN, "[TEST] warn message", logCaptureAppender, level22);
        logger.error("error message");
        assertLastLog$1(org.slf4j.event.Level.ERROR, "[TEST] error message", logCaptureAppender, level22);
    }

    private KafkaConfig createBrokerConfig() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    private void setupConnectionManager(ConnectionMode connectionMode) {
        linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        if (ConnectionMode$Outbound$.MODULE$.equals(connectionMode)) {
            linkProps().put("bootstrap.servers", "localhost:123");
        }
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        linkMetrics_$eq(new ClusterLinkMetrics(linkName(), linkId(), LinkMode$Destination$.MODULE$, linkManager(), None$.MODULE$, metrics(), None$.MODULE$));
        linkMetrics().startup();
        connManager_$eq(new ClusterLinkDestConnectionManager(linkData(), linkConfig(), destClusterId(), None$.MODULE$, linkMetrics(), (clusterLinkConfig, clusterLinkDestConnectionManager) -> {
            return this.createRemoteAdmin(clusterLinkConfig, clusterLinkDestConnectionManager);
        }, str -> {
            return this.createLocalAdmin();
        }, controller(), brokerConfig(), new MockTime()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConfluentAdmin createLocalAdmin() {
        kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(new MockAdminClient(this) { // from class: kafka.server.link.ClusterLinkDestConnectionManagerTest$$anon$1
            private final /* synthetic */ ClusterLinkDestConnectionManagerTest $outer;

            public void close(Duration duration) {
                super.close(duration);
                this.$outer.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(null);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        return localAdmin();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterLinkAdminClient createRemoteAdmin(ClusterLinkConfig clusterLinkConfig, ClusterLinkDestConnectionManager clusterLinkDestConnectionManager) {
        Assertions.assertEquals(linkConfig().originals(), clusterLinkConfig.originals());
        Assertions.assertSame(connManager(), clusterLinkDestConnectionManager);
        remoteAdmin_$eq((ClusterLinkAdminClient) EasyMock.createNiceMock(ClusterLinkAdminClient.class));
        remoteAdmin().close();
        EasyMock.expect(BoxedUnit.UNIT).andAnswer(() -> {
            $anonfun$createRemoteAdmin$1(this);
            return BoxedUnit.UNIT;
        }).anyTimes();
        EasyMock.expect(remoteAdmin().networkClient()).andReturn(networkClient()).anyTimes();
        EasyMock.expect(remoteAdmin().metadataManager()).andReturn(adminMetadataManager()).anyTimes();
        EasyMock.replay(new Object[]{remoteAdmin()});
        return remoteAdmin();
    }

    private ReverseNode reverseNode(int i, int i2) {
        return new ReverseNode(i, i, new StringBuilder(4).append("host").append(i).toString(), 1234, linkId(), i2, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public static final /* synthetic */ void $anonfun$testLogLevel$1(ClusterLinkDestConnectionManagerTest clusterLinkDestConnectionManagerTest, Class cls, LogCaptureAppender logCaptureAppender, org.slf4j.event.Level level) {
        clusterLinkDestConnectionManagerTest.verifyLogLevel(cls, logCaptureAppender, new Some(level));
    }

    private static final void assertLastLog$1(org.slf4j.event.Level level, String str, LogCaptureAppender logCaptureAppender, org.slf4j.event.Level level2) {
        LoggingEvent loggingEvent = (LoggingEvent) logCaptureAppender.getMessages().last();
        Assertions.assertEquals((level2.toInt() < level.toInt() ? level2 : level).name(), loggingEvent.getLevel().toString());
        Assertions.assertEquals(str, loggingEvent.getMessage());
    }

    public static final /* synthetic */ void $anonfun$createRemoteAdmin$1(ClusterLinkDestConnectionManagerTest clusterLinkDestConnectionManagerTest) {
        clusterLinkDestConnectionManagerTest.closedClients().$plus$eq(clusterLinkDestConnectionManagerTest.remoteAdmin());
        clusterLinkDestConnectionManagerTest.remoteAdmin_$eq(null);
    }
}
