package kafka.server.link;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.utils.Time;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkDestConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\rv!\u0002\u0016,\u0011\u0003\u0011d!\u0002\u001b,\u0011\u0003)\u0004\"\u0002\u001f\u0002\t\u0003i\u0004b\u0002 \u0002\u0005\u0004%\ta\u0010\u0005\u0007\u0019\u0006\u0001\u000b\u0011\u0002!\u0007\tQZ\u0003!\u0014\u0005\n1\u0016\u0011\t\u0011)A\u00053~C\u0001\u0002Y\u0003\u0003\u0002\u0003\u0006I!\u0019\u0005\nI\u0016\u0011\t\u0011)A\u0005KBD\u0001\"]\u0003\u0003\u0002\u0003\u0006IA\u001d\u0005\u000b\u0003\u0003)!\u0011!Q\u0001\n\u0005\r\u0001BCA\u0005\u000b\t\u0005\t\u0015!\u0003\u0002\f!Q\u0011\u0011D\u0003\u0003\u0002\u0003\u0006I!a\u0007\t\u0015\u0005\u001dRA!b\u0001\n\u0003\tI\u0003\u0003\u0006\u00026\u0015\u0011\t\u0011)A\u0005\u0003WA!\"a\u000e\u0006\u0005\u0003\u0005\u000b\u0011BA\u001d\u0011)\t\t%\u0002B\u0001B\u0003%\u00111\t\u0005\u0007y\u0015!\t!a\u0015\t\u0013\u0005%TA1A\u0005\n\u0005-\u0004\u0002CAA\u000b\u0001\u0006I!!\u001c\t\u0011\u0005\rUA1A\u0005\n}Bq!!\"\u0006A\u0003%\u0001\t\u0003\u0005\u0002\b\u0016\u0011\r\u0011\"\u0003@\u0011\u001d\tI)\u0002Q\u0001\n\u0001C\u0001\"a#\u0006\u0005\u0004%Ia\u0010\u0005\b\u0003\u001b+\u0001\u0015!\u0003A\u0011%\ty)\u0002a\u0001\n\u0013\t\t\nC\u0005\u0002\u0016\u0016\u0001\r\u0011\"\u0003\u0002\u0018\"A\u00111U\u0003!B\u0013\t\u0019\nC\u0004\u0002.\u0016!\t%a,\t\u0011\u00055W\u0001\"\u0001,\u0003\u001fDq!!>\u0006\t\u0003\n9\u0010C\u0004\u0003\u000e\u0015!\tEa\u0004\t\u000f\t=S\u0001\"\u0003\u0003R!9!qM\u0003\u0005\n\t%\u0004b\u0002B@\u000b\u0011\u0005#\u0011\u0011\u0005\b\u0005\u001b+A\u0011\u000bBH\u0011\u001d\u0011\t*\u0002C)\u0005\u001fCqAa%\u0006\t\u0003\u0011)\nC\u0004\u0003\u001a\u0016!\tEa'\t\u000f\tuU\u0001\"\u0011\u0003\u001c\"i!qT\u0003\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003\"B\f\u0001e\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"p]:,7\r^5p]6\u000bg.Y4fe*\u0011A&L\u0001\u0005Y&t7N\u0003\u0002/_\u000511/\u001a:wKJT\u0011\u0001M\u0001\u0006W\u000647.Y\u0002\u0001!\t\u0019\u0014!D\u0001,\u0005\u0001\u001aE.^:uKJd\u0015N\\6EKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0014\u0005\u00051\u0004CA\u001c;\u001b\u0005A$\"A\u001d\u0002\u000bM\u001c\u0017\r\\1\n\u0005mB$AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002e\u0005!b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012,\u0012\u0001\u0011\t\u0003\u0003*k\u0011A\u0011\u0006\u0003\u0007\u0012\u000ba!\u0019;p[&\u001c'BA#G\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003\u000f\"\u000bA!\u001e;jY*\t\u0011*\u0001\u0003kCZ\f\u0017BA&C\u00055\tEo\\7jG&sG/Z4fe\u0006)b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u00043cA\u0003O#B\u00111gT\u0005\u0003!.\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\u0005\u0002S+:\u00111gU\u0005\u0003).\n!c\u00117vgR,'\u000fT5oW\u001a\u000b7\r^8ss&\u0011ak\u0016\u0002\u0016\t\u0016\u001cHoQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0015\t!6&\u0001\u0005mS:\\G)\u0019;b!\tQV,D\u0001\\\u0015\tav&\u0001\u0002{W&\u0011al\u0017\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uC&\u0011\u0001lT\u0001\u000eS:LG/[1m\u0007>tg-[4\u0011\u0005M\u0012\u0017BA2,\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u0014Y>\u001c\u0017\r\u001c'pO&\u001c\u0017\r\\\"mkN$XM\u001d\t\u0003M6t!aZ6\u0011\u0005!DT\"A5\u000b\u0005)\f\u0014A\u0002\u001fs_>$h(\u0003\u0002mq\u00051\u0001K]3eK\u001aL!A\\8\u0003\rM#(/\u001b8h\u0015\ta\u0007(\u0003\u0002e\u001f\u0006\t2\r\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0011\u0007]\u001aX/\u0003\u0002uq\t1q\n\u001d;j_:\u0004\"A\u001e@\u000e\u0003]T!\u0001_=\u0002\u000f\rd\u0017.\u001a8ug*\u0011\u0001G\u001f\u0006\u0003wr\fa!\u00199bG\",'\"A?\u0002\u0007=\u0014x-\u0003\u0002��o\n\t2\t\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0002\u000f5,GO]5dgB\u00191'!\u0002\n\u0007\u0005\u001d1F\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0005:f[>$X-\u00113nS:4\u0015m\u0019;pef\u0004\u0002bNA\u0007C\u0006E\u00111C\u0005\u0004\u0003\u001fA$!\u0003$v]\u000e$\u0018n\u001c83!\t\u0019T\u0001E\u00024\u0003+I1!a\u0006,\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018A\u00037pG\u0006d\u0017\tZ7j]B!\u0011QDA\u0012\u001b\t\tyBC\u0002\u0002\"]\fQ!\u00193nS:LA!!\n\u0002 \tq1i\u001c8gYV,g\u000e^!e[&t\u0017AC2p]R\u0014x\u000e\u001c7feV\u0011\u00111\u0006\t\u0005\u0003[\t\t$\u0004\u0002\u00020)\u0019\u0011qE\u0018\n\t\u0005M\u0012q\u0006\u0002\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe\u0006Y1m\u001c8ue>dG.\u001a:!\u00031\u0011'o\\6fe\u000e{gNZ5h!\u0011\tY$!\u0010\u000e\u00035J1!a\u0010.\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\tQLW.\u001a\t\u0005\u0003\u000b\ny%\u0004\u0002\u0002H)!\u0011\u0011JA&\u0003\u0015)H/\u001b7t\u0015\r\ti%_\u0001\u0007G>lWn\u001c8\n\t\u0005E\u0013q\t\u0002\u0005)&lW\r\u0006\f\u0002\u0012\u0005U\u0013qKA-\u00037\ni&a\u0018\u0002b\u0005\r\u0014QMA4\u0011\u0015A\u0016\u00031\u0001Z\u0011\u0015\u0001\u0017\u00031\u0001b\u0011\u0015!\u0017\u00031\u0001f\u0011\u0015\t\u0018\u00031\u0001s\u0011\u001d\t\t!\u0005a\u0001\u0003\u0007Aq!!\u0003\u0012\u0001\u0004\tY\u0001C\u0004\u0002\u001aE\u0001\r!a\u0007\t\u000f\u0005\u001d\u0012\u00031\u0001\u0002,!9\u0011qG\tA\u0002\u0005e\u0002bBA!#\u0001\u0007\u00111I\u0001\u0013G>tg.Z2uS>t'+Z9vKN$8/\u0006\u0002\u0002nAA\u0011qNA9\u0003k\nY(D\u0001E\u0013\r\t\u0019\b\u0012\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bcA\u001c\u0002x%\u0019\u0011\u0011\u0010\u001d\u0003\u0007%sG\u000fE\u00024\u0003{J1!a ,\u00055\u0011VM^3sg\u0016\u001cE.[3oi\u0006\u00192m\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;tA\u0005!b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\fQC\\3yiJ+g/\u001a:tKJ+\u0017/^3ti&#\u0007%A\u000bqKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:\u0002-A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8og\u0002\n\u0001$Y2uSZ,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t\u0003e\t7\r^5wKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u0011\u0002-I,g/\u001a:tK\u000e{gN\\3di&|g.\u00113nS:,\"!a%\u0011\t]\u001a\u00181P\u0001\u001be\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]~#S-\u001d\u000b\u0005\u00033\u000by\nE\u00028\u00037K1!!(9\u0005\u0011)f.\u001b;\t\u0013\u0005\u00056$!AA\u0002\u0005M\u0015a\u0001=%c\u00059\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lg\u000e\t\u0015\u00049\u0005\u001d\u0006cA\u001c\u0002*&\u0019\u00111\u0016\u001d\u0003\u0011Y|G.\u0019;jY\u0016\f\u0011#\u001a8bE2,7\t\\;ti\u0016\u0014H*\u001b8l)\u0019\tI*!-\u0002<\"9\u00111W\u000fA\u0002\u0005U\u0016!\u00048fi^|'o[\"mS\u0016tG\u000fE\u00024\u0003oK1!!/,\u0005a\u0019E.^:uKJd\u0015N\\6OKR<xN]6DY&,g\u000e\u001e\u0005\b\u0003{k\u0002\u0019AA`\u0003=iW\r^1eCR\fW*\u00198bO\u0016\u0014\b\u0003B\u001ct\u0003\u0003\u0004B!a1\u0002J6\u0011\u0011Q\u0019\u0006\u0005\u0003\u000f\fy\"A\u0005j]R,'O\\1mg&!\u00111ZAc\u0005Q\tE-\\5o\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0006I\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8Qe>4\u0018\u000eZ3s)!\t\t.a:\u0002p\u0006E\b\u0003B\u001ct\u0003'\u0004B!!6\u0002b:!\u0011q[Ao\u001b\t\tIN\u0003\u0003\u0002\\\u0006-\u0013a\u00028fi^|'o[\u0005\u0005\u0003?\fI.A\u0006SKZ,'o]3O_\u0012,\u0017\u0002BAr\u0003K\u0014!cQ8o]\u0016\u001cG/[8o!J|g/\u001b3fe*!\u0011q\\Am\u0011\u001d\t\u0019L\ba\u0001\u0003S\u00042A^Av\u0013\r\tio\u001e\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\t\u000f\u0005uf\u00041\u0001\u0002@\"1\u00111\u001f\u0010A\u0002\u0015\f\u0001b\u00197jK:$\u0018\nZ\u0001\u0019aJ|7-Z:t%\u00164XM]:f\u0007>tg.Z2uS>tGCBAM\u0003s\u0014\u0019\u0001C\u0004\u0002|~\u0001\r!!@\u0002\u000f\rD\u0017M\u001c8fYB!\u0011q[A��\u0013\u0011\u0011\t!!7\u0003\u0019-\u000bgm[1DQ\u0006tg.\u001a7\t\u000f\t\u0015q\u00041\u0001\u0003\b\u0005Y!/\u001a<feN,gj\u001c3f!\u0011\t9N!\u0003\n\t\t-\u0011\u0011\u001c\u0002\f%\u00164XM]:f\u001d>$W-\u0001\u000ej]&$\u0018.\u0019;f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000f\u0006\u0004\u0003\u0012\tU\"Q\t\t\u0007\u0005'\u0011iBa\t\u000f\t\tU!\u0011\u0004\b\u0004Q\n]\u0011\"A\u001d\n\u0007\tm\u0001(A\u0004qC\u000e\\\u0017mZ3\n\t\t}!\u0011\u0005\u0002\u0004'\u0016\f(b\u0001B\u000eqA1\u0011q\u000eB\u0013\u0005SI1Aa\nE\u0005E\u0019u.\u001c9mKR\f'\r\\3GkR,(/\u001a\t\u0005\u0005W\u0011\t$\u0004\u0002\u0003.)\u0019!q\u0006%\u0002\t1\fgnZ\u0005\u0005\u0005g\u0011iC\u0001\u0003W_&$\u0007b\u0002B\u001cA\u0001\u0007!\u0011H\u0001\u001aS:LG/[1uK\u000e{gN\\3di&|gNU3rk\u0016\u001cH\u000f\u0005\u0003\u0003<\t\u0005SB\u0001B\u001f\u0015\u0011\u0011y$a\u0013\u0002\u0011I,\u0017/^3tiNLAAa\u0011\u0003>\t\t\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\"9!q\t\u0011A\u0002\t%\u0013A\u0004:fcV,7\u000f^\"p]R,\u0007\u0010\u001e\t\u0005\u0005w\u0011Y%\u0003\u0003\u0003N\tu\"A\u0004*fcV,7\u000f^\"p]R,\u0007\u0010^\u0001\u0019e\u0016\fX/Z:u%\u00164XM]:f\u0007>tg.Z2uS>tG\u0003CAM\u0005'\u00129Fa\u0017\t\u000f\tU\u0013\u00051\u0001\u0002v\u0005I!/Z9vKN$\u0018\n\u001a\u0005\b\u00053\n\u0003\u0019AA>\u0003\u0019\u0019G.[3oi\"9!QL\u0011A\u0002\t}\u0013\u0001\u00028pI\u0016\u0004BA!\u0019\u0003d5\u0011\u00111J\u0005\u0005\u0005K\nYE\u0001\u0003O_\u0012,\u0017!\u00074pe^\f'\u000f\u001a+p%\u0016lw\u000e^3D_:$(o\u001c7mKJ$b!!'\u0003l\tm\u0004b\u0002B7E\u0001\u0007!qN\u0001\fe\u0016\fX/Z:u\t\u0006$\u0018\r\u0005\u0003\u0003r\t]TB\u0001B:\u0015\u0011\u0011)(a\u0013\u0002\u000f5,7o]1hK&!!\u0011\u0010B:\u0005\u0015Je.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t%\u0016\fX/Z:u\t\u0006$\u0018\rC\u0004\u0003~\t\u0002\rA!\u0005\u0002\u000f\u0019,H/\u001e:fg\u0006\u0011rN\\\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4f)\u0011\tIJa!\t\u000f\t\u00155\u00051\u0001\u0003\b\u0006\u0011\u0012n]!di&4XmQ8oiJ|G\u000e\\3s!\r9$\u0011R\u0005\u0004\u0005\u0017C$a\u0002\"p_2,\u0017M\\\u0001\u001cG2|7/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006#W.\u001b8\u0015\u0005\u0005e\u0015\u0001H2sK\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u0018e\u00164XM]:f\u0007>tg.Z2uS>t7\t\\5f]R,\"Aa&\u0011\t]\u001a\u0018\u0011^\u0001\u001aa\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8D_VtG/\u0006\u0002\u0002v\u00051\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8D_VtG/A\rtkB,'\u000f\n7pG\u0006dGj\\4jG\u0006d7\t\\;ti\u0016\u0014X#A3")
/* loaded from: input_file:kafka/server/link/ClusterLinkDestConnectionManager.class */
public class ClusterLinkDestConnectionManager extends ClusterLinkConnectionManager implements ClusterLinkFactory.DestConnectionManager {
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final ConfluentAdmin localAdmin;
    private final KafkaController controller;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    public KafkaController controller() {
        return this.controller;
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> option) {
        this.reverseConnectionAdmin = option;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void enableClusterLink(ClusterLinkNetworkClient clusterLinkNetworkClient, Option<AdminMetadataManager> option) {
        KafkaClient networkClient = clusterLinkNetworkClient.networkClient();
        if (networkClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient) networkClient;
            networkClient2.enableDestinationClusterLink(super.linkData().linkId(), (ClientInterceptor) this.clientInterceptor.orNull(Predef$.MODULE$.$conforms()), (ReverseNode.ConnectionProvider) reverseConnectionProvider(networkClient2, option, clusterLinkNetworkClient.clientId()).orNull(Predef$.MODULE$.$conforms()));
            return;
        }
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null) {
            if (connectionMode$Inbound$ != null) {
                return;
            }
        } else if (!connectionMode.equals(connectionMode$Inbound$)) {
            return;
        }
        throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> option, String str) {
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null ? !connectionMode.equals(connectionMode$Inbound$) : connectionMode$Inbound$ != null) {
            return None$.MODULE$;
        }
        ReverseClient reverseClient = new ReverseClient(networkClient, option, ReverseClient$.MODULE$.$lessinit$greater$default$3(), str);
        return new Some(node -> {
            this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node);
        });
    }

    @Override // kafka.server.link.ClusterLinkFactory.DestConnectionManager
    public void processReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
        debug(() -> {
            return new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(kafkaChannel).append(" ").append(reverseNode).toString();
        });
        ensureReverseConnectionsEnabled();
        Tuple2 tuple2 = !reverseNode.requestId().isPresent() ? new Tuple2(reverseConnectionAdmin(), BoxesRunTime.boxToBoolean(true)) : new Tuple2(Option$.MODULE$.apply(connectionRequests().remove(reverseNode.requestId().get())), BoxesRunTime.boxToBoolean(false));
        Some some = (Option) tuple2._1();
        boolean _2$mcZ$sp = tuple2._2$mcZ$sp();
        if (!(some instanceof Some)) {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            throw new NetworkException("Reverse connection is no longer required");
        }
        ReverseClient reverseClient = (ReverseClient) some.value();
        Consumer consumer = kafkaChannel2 -> {
            this.debug(() -> {
                return new StringBuilder(38).append("Reverse channel ").append(kafkaChannel2).append(" has been disconnected").toString();
            });
            this.metrics.reverseConnectionClosedSensor().record();
            this.activeReverseConnections().decrementAndGet();
            if (_2$mcZ$sp && this.persistentConnections().decrementAndGet() <= 0 && this.controller().isActive()) {
                this.info(() -> {
                    return "Persistent connection to source controller was disconnected, awaiting new connection.";
                });
            }
        };
        activeReverseConnections().incrementAndGet();
        if (_2$mcZ$sp) {
            persistentConnections().incrementAndGet();
        }
        this.metrics.reverseConnectionCreatedSensor().record();
        ReverseChannel reverseChannel = new ReverseChannel(kafkaChannel, reverseNode, consumer);
        reverseClient.networkClient().reverseAndAdd(reverseChannel);
        reverseClient.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
        info(() -> {
            return new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString();
        });
    }

    /* JADX WARN: Code restructure failed: missing block: B:15:0x0049, code lost:
    
        if (r0.equals(r1) != false) goto L10;
     */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest r7, org.apache.kafka.common.requests.RequestContext r8) {
        /*
            r6 = this;
            r0 = r6
            r1 = r6
            r2 = r7
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$1(r1, r2);
            }
            r0.debug(r1)
            r0 = r6
            r0.ensureReverseConnectionsEnabled()
            r0 = r7
            org.apache.kafka.common.message.InitiateReverseConnectionsRequestData r0 = r0.data()
            r9 = r0
            scala.collection.immutable.List$ r0 = scala.collection.immutable.List$.MODULE$
            r1 = r9
            java.util.List r1 = r1.entries()
            int r1 = r1.size()
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$2();
            }
            scala.collection.GenTraversable r0 = r0.fill(r1, r2)
            scala.collection.immutable.List r0 = (scala.collection.immutable.List) r0
            r10 = r0
            r0 = r6
            java.lang.String r0 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L7e
            r1 = r9
            java.lang.String r1 = r1.sourceClusterId()     // Catch: java.lang.Throwable -> L7e
            r11 = r1
            r1 = r0
            if (r1 != 0) goto L44
        L3c:
            r0 = r11
            if (r0 == 0) goto L4c
            goto L74
        L44:
            r1 = r11
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L7e
            if (r0 == 0) goto L74
        L4c:
            org.apache.kafka.common.errors.InvalidRequestException r0 = new org.apache.kafka.common.errors.InvalidRequestException     // Catch: java.lang.Throwable -> L7e
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder     // Catch: java.lang.Throwable -> L7e
            r3 = r2
            r4 = 70
            r3.<init>(r4)     // Catch: java.lang.Throwable -> L7e
            java.lang.String r3 = "Cannot initiate reverse connection from destination cluster "
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L7e
            r3 = r6
            java.lang.String r3 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L7e
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L7e
            java.lang.String r3 = " to itself"
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L7e
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Throwable -> L7e
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L7e
            throw r0     // Catch: java.lang.Throwable -> L7e
        L74:
            r0 = r6
            r1 = r9
            r2 = r10
            r0.forwardToRemoteController(r1, r2)     // Catch: java.lang.Throwable -> L7e
            goto L9c
        L7e:
            r12 = move-exception
            r0 = r6
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$3();
            }
            r2 = r12
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$4(r2);
            }
            r0.error(r1, r2)
            r0 = r10
            r1 = r12
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = (v1) -> { // scala.Function1.apply(java.lang.Object):java.lang.Object
                return $anonfun$initiateReverseConnections$5$adapted(r1, v1);
            }
            r0.foreach(r1)
        L9c:
            r0 = r10
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkDestConnectionManager.initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest, org.apache.kafka.common.requests.RequestContext):scala.collection.Seq");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestReverseConnection(int i, ReverseClient reverseClient, Node node) {
        debug(() -> {
            return new StringBuilder(66).append("Requesting reverse connection with requestId ").append(i).append(" to node ").append(node).append(" for client ").append(reverseClient.clientId()).toString();
        });
        ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData entries = new InitiateReverseConnectionsRequestData().setClusterLinkId(linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String) super.linkData().clusterId().orNull(Predef$.MODULE$.$conforms())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(i).setSourceBrokerId(node.id()).setTargetBrokerId(this.brokerConfig.brokerId())));
        if (controller().isActive()) {
            CompletableFuture completableFuture = new CompletableFuture();
            forwardToRemoteController(entries, new $colon.colon(completableFuture, Nil$.MODULE$));
            completableFuture.whenComplete((r10, th) -> {
                this.onCompletion$1(th, i, reverseClient, node);
            });
        } else {
            ((KafkaFutureImpl) this.localAdmin.initiateReverseConnections(entries, (Integer) null).get(BoxesRunTime.boxToInteger(i))).whenComplete((r102, th2) -> {
                this.onCompletion$1(th2, i, reverseClient, node);
            });
        }
        connectionRequests().put(BoxesRunTime.boxToInteger(i), reverseClient);
    }

    private void forwardToRemoteController(InitiateReverseConnectionsRequestData initiateReverseConnectionsRequestData, Seq<CompletableFuture<Void>> seq) {
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient().map(clusterLinkAdminClient -> {
                return clusterLinkAdminClient.admin();
            });
        }).getOrElse(() -> {
            if (this.controller().isActive()) {
                throw new NetworkException("Request cannot be forwarded to remote controller at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote controller since this broker is not the controller.");
        });
        debug(() -> {
            return new StringBuilder(66).append("Forward initiate reverse connection request to remote controller: ").append(initiateReverseConnectionsRequestData).toString();
        });
        Map initiateReverseConnections = kafkaAdminClient.initiateReverseConnections(initiateReverseConnectionsRequestData, (Integer) null);
        ((IterableLike) ((IterableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(initiateReverseConnectionsRequestData.entries()).asScala()).zip(seq, Buffer$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            InitiateReverseConnectionsRequestData.EntryData entryData = (InitiateReverseConnectionsRequestData.EntryData) tuple2._1();
            CompletableFuture completableFuture = (CompletableFuture) tuple2._2();
            return ((KafkaFutureImpl) initiateReverseConnections.get(BoxesRunTime.boxToInteger(entryData.initiateRequestId()))).whenComplete((r7, th) -> {
                if (th != null) {
                    this.warn(() -> {
                        return new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entryData.initiateRequestId()).toString();
                    }, () -> {
                        return th;
                    });
                    completableFuture.completeExceptionally(th);
                } else {
                    this.debug(() -> {
                        return new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entryData.initiateRequestId()).toString();
                    });
                    completableFuture.complete(r7);
                }
            });
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onControllerChange(boolean z) {
        synchronized (stateChangeLock()) {
            if (reverseConnectionAdmin().isEmpty()) {
                if (z) {
                    resetReverseConnectionAdmin();
                }
            } else if (!z) {
                closeReverseConnectionAdmin();
            }
        }
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void closeReverseConnectionAdmin() {
        reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient();
        }).foreach(clusterLinkAdminClient -> {
            $anonfun$closeReverseConnectionAdmin$2(this, clusterLinkAdminClient);
            return BoxedUnit.UNIT;
        });
        reverseConnectionAdmin_$eq(None$.MODULE$);
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void createReverseConnectionAdmin() {
        debug(() -> {
            return "Recreate admin client used to initiate connection reversal requests";
        });
        if (controller().isActive()) {
            ClusterLinkAdminClient clusterLinkAdminClient = (ClusterLinkAdminClient) this.remoteAdminFactory.apply(currentConfig(), this);
            reverseConnectionAdmin_$eq(new Some(new ReverseClient(clusterLinkAdminClient.networkClient(), new Some(clusterLinkAdminClient.metadataManager()), new Some(clusterLinkAdminClient), clusterLinkAdminClient.clientId())));
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return reverseConnectionAdmin().map(reverseClient -> {
            return reverseClient.networkClient();
        });
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int persistentConnectionCount() {
        if (controller().isActive()) {
            return persistentConnections().get();
        }
        return 0;
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int reverseConnectionCount() {
        return activeReverseConnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnections$5(Throwable th, CompletableFuture completableFuture) {
        return completableFuture.completeExceptionally(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onCompletion$1(Throwable th, int i, ReverseClient reverseClient, Node node) {
        if (th == null) {
            debug(() -> {
                return new StringBuilder(50).append("Reverse connection has been created for requestId=").append(i).toString();
            });
            return;
        }
        warn(() -> {
            return new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(i).toString();
        }, () -> {
            return th;
        });
        connectionRequests().remove(BoxesRunTime.boxToInteger(i));
        reverseClient.networkClient().processReverseConnectionFailure(node);
    }

    public static final /* synthetic */ void $anonfun$closeReverseConnectionAdmin$2(ClusterLinkDestConnectionManager clusterLinkDestConnectionManager, ClusterLinkAdminClient clusterLinkAdminClient) {
        CoreUtils$.MODULE$.swallow(() -> {
            clusterLinkAdminClient.close();
        }, clusterLinkDestConnectionManager, CoreUtils$.MODULE$.swallow$default$3());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkDestConnectionManager(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, String str, Option<ClientInterceptor> option, ClusterLinkMetrics clusterLinkMetrics, Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> function2, ConfluentAdmin confluentAdmin, KafkaController kafkaController, KafkaConfig kafkaConfig, Time time) {
        super(clusterLinkData, clusterLinkConfig, str, clusterLinkMetrics);
        this.clientInterceptor = option;
        this.metrics = clusterLinkMetrics;
        this.remoteAdminFactory = function2;
        this.localAdmin = confluentAdmin;
        this.controller = kafkaController;
        this.brokerConfig = kafkaConfig;
        this.time = time;
        this.connectionRequests = new ConcurrentHashMap<>();
        this.nextReverseRequestId = ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        logIdent_$eq(new StringBuilder(44).append("[ClusterLinkDestConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(kafkaConfig.brokerId()).append("] ").toString());
    }
}
