package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.Log4jControllerRegistration$;
import kafka.utils.Logging;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.metrics.Sensor;
import org.slf4j.event.Level;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005]g!\u0002\u0015*\u0003\u0003\u0001\u0004\u0002C#\u0001\u0005\u000b\u0007I\u0011\u0001$\t\u00115\u0003!\u0011!Q\u0001\n\u001dC\u0001B\u0014\u0001\u0003\u0002\u0003\u0006Ia\u0014\u0005\t%\u0002\u0011)\u0019!C\u0001'\"Aq\f\u0001B\u0001B\u0003%A\u000b\u0003\u0005a\u0001\t\u0005\t\u0015!\u0003b\u0011!!\u0007A!A!\u0002\u0013)\u0007\u0002\u00035\u0001\u0005\u0003\u0005\u000b\u0011B5\t\u000b5\u0004A\u0011\u00018\t\u000fY\u0004!\u0019!C\to\"9\u0011q\u0001\u0001!\u0002\u0013A\b\"CA\u0005\u0001\t\u0007I\u0011CA\u0006\u0011!\ti\u0002\u0001Q\u0001\n\u00055\u0001\"CA\u0010\u0001\u0001\u0007I\u0011BA\u0011\u0011%\t\u0019\u0003\u0001a\u0001\n\u0013\t)\u0003C\u0004\u00022\u0001\u0001\u000b\u0015B(\t\u0015\u0005m\u0002\u00011A\u0005\u0012%\ni\u0004\u0003\u0006\u0002F\u0001\u0001\r\u0011\"\u0005*\u0003\u000fB\u0001\"a\u0013\u0001A\u0003&\u0011q\b\u0005\n\u0003\u001f\u0002\u0001\u0019!C\t\u0003{A\u0011\"!\u0015\u0001\u0001\u0004%\t\"a\u0015\t\u0011\u0005]\u0003\u0001)Q\u0005\u0003\u007fA\u0011\"a\u0017\u0001\u0005\u0004%\t!!\u0018\t\u0011\u0005\r\u0005\u0001)A\u0005\u0003?Bq!!\"\u0001\t\u0003\n9\t\u0003\u0005\u0002\n\u0002!\t%KAF\u0011\u001d\t\t\u000b\u0001C!\u0003GCq!!+\u0001\t\u0003\n9\tC\u0004\u0002,\u0002!\t\"a\"\t\u000f\u00055\u0006\u0001\"\u0011\u0002>!9\u0011q\u0016\u0001\u0005B\u0005\u0005\u0002bBAY\u0001\u0011\u0005\u00111\u0017\u0005\b\u0003w\u0003A\u0011CAD\u0011\u001d\ti\f\u0001C\t\u0003\u000fCq!a0\u0001\r#\t9\tC\u0004\u0002B\u00021\t\"a\"\t\u000f\u0005\r\u0007\u0001\"\u0005\u0002\b\"9\u0011Q\u0019\u0001\u0005B\u0005u\u0002bBAd\u0001\u0011\u0005\u0013\u0011\u001a\u0002\u001d\u00072,8\u000f^3s\u0019&t7nQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0015\tQ3&\u0001\u0003mS:\\'B\u0001\u0017.\u0003\u0019\u0019XM\u001d<fe*\ta&A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\t\u0001\ttg\u0010\t\u0003eUj\u0011a\r\u0006\u0002i\u0005)1oY1mC&\u0011ag\r\u0002\u0007\u0003:L(+\u001a4\u0011\u0005abdBA\u001d;\u001b\u0005I\u0013BA\u001e*\u0003I\u0019E.^:uKJd\u0015N\\6GC\u000e$xN]=\n\u0005ur$!E\"p]:,7\r^5p]6\u000bg.Y4fe*\u00111(\u000b\t\u0003\u0001\u000ek\u0011!\u0011\u0006\u0003\u00056\nQ!\u001e;jYNL!\u0001R!\u0003\u000f1{wmZ5oO\u0006AA.\u001b8l\t\u0006$\u0018-F\u0001H!\tA5*D\u0001J\u0015\tQU&\u0001\u0002{W&\u0011A*\u0013\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uC\u0006IA.\u001b8l\t\u0006$\u0018\rI\u0001\u000eS:LG/[1m\u0007>tg-[4\u0011\u0005e\u0002\u0016BA)*\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u0014Y>\u001c\u0017\r\u001c'pO&\u001c\u0017\r\\\"mkN$XM]\u000b\u0002)B\u0011Q\u000b\u0018\b\u0003-j\u0003\"aV\u001a\u000e\u0003aS!!W\u0018\u0002\rq\u0012xn\u001c;?\u0013\tY6'\u0001\u0004Qe\u0016$WMZ\u0005\u0003;z\u0013aa\u0015;sS:<'BA.4\u0003QawnY1m\u0019><\u0017nY1m\u00072,8\u000f^3sA\u0005yQ.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000f\u0005\u0002:E&\u00111-\u000b\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\b[\u0016$(/[2t!\tId-\u0003\u0002hS\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u00031\u0011'o\\6fe\u000e{gNZ5h!\tQ7.D\u0001,\u0013\ta7FA\u0006LC\u001a\\\u0017mQ8oM&<\u0017A\u0002\u001fj]&$h\bF\u0004paF\u00148\u000f^;\u0011\u0005e\u0002\u0001\"B#\n\u0001\u00049\u0005\"\u0002(\n\u0001\u0004y\u0005\"\u0002*\n\u0001\u0004!\u0006\"\u00021\n\u0001\u0004\t\u0007\"\u00023\n\u0001\u0004)\u0007\"\u00025\n\u0001\u0004I\u0017A\u00027j].LE-F\u0001y!\rI\u00181A\u0007\u0002u*\u00111\u0010`\u0001\u0007G>lWn\u001c8\u000b\u00059j(B\u0001@��\u0003\u0019\t\u0007/Y2iK*\u0011\u0011\u0011A\u0001\u0004_J<\u0017bAA\u0003u\n!Q+^5e\u0003\u001da\u0017N\\6JI\u0002\nqb\u001d;bi\u0016\u001c\u0005.\u00198hK2{7m[\u000b\u0003\u0003\u001b\u0001B!a\u0004\u0002\u001a5\u0011\u0011\u0011\u0003\u0006\u0005\u0003'\t)\"\u0001\u0003mC:<'BAA\f\u0003\u0011Q\u0017M^1\n\t\u0005m\u0011\u0011\u0003\u0002\u0007\u001f\nTWm\u0019;\u0002!M$\u0018\r^3DQ\u0006tw-\u001a'pG.\u0004\u0013!E2mkN$XM\u001d'j].\u001cuN\u001c4jOV\tq*A\u000bdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e|F%Z9\u0015\t\u0005\u001d\u0012Q\u0006\t\u0004e\u0005%\u0012bAA\u0016g\t!QK\\5u\u0011!\tycDA\u0001\u0002\u0004y\u0015a\u0001=%c\u0005\u00112\r\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4!Q\r\u0001\u0012Q\u0007\t\u0004e\u0005]\u0012bAA\u001dg\tAao\u001c7bi&dW-\u0001\rsKZ,'o]3D_:tWm\u0019;j_:,e.\u00192mK\u0012,\"!a\u0010\u0011\u0007I\n\t%C\u0002\u0002DM\u0012qAQ8pY\u0016\fg.\u0001\u000fsKZ,'o]3D_:tWm\u0019;j_:,e.\u00192mK\u0012|F%Z9\u0015\t\u0005\u001d\u0012\u0011\n\u0005\n\u0003_\u0011\u0012\u0011!a\u0001\u0003\u007f\t\u0011D]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u000b:\f'\r\\3eA!\u001a1#!\u000e\u0002\u0011%\u001c\u0018i\u0019;jm\u0016\fA\"[:BGRLg/Z0%KF$B!a\n\u0002V!I\u0011qF\u000b\u0002\u0002\u0003\u0007\u0011qH\u0001\nSN\f5\r^5wK\u0002B3AFA\u001b\u0003-i\u0017\r\u001f'pO2+g/\u001a7\u0016\u0005\u0005}\u0003CBA1\u0003_\n\u0019(\u0004\u0002\u0002d)!\u0011QMA4\u0003\u0019\tGo\\7jG*!\u0011\u0011NA6\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0005\u0003[\n)\"\u0001\u0003vi&d\u0017\u0002BA9\u0003G\u0012q\"\u0011;p[&\u001c'+\u001a4fe\u0016t7-\u001a\t\u0005\u0003k\ny(\u0004\u0002\u0002x)!\u0011\u0011PA>\u0003\u0015)g/\u001a8u\u0015\r\tih`\u0001\u0006g24GG[\u0005\u0005\u0003\u0003\u000b9HA\u0003MKZ,G.\u0001\u0007nCbdun\u001a'fm\u0016d\u0007%A\u0004ti\u0006\u0014H/\u001e9\u0015\u0005\u0005\u001d\u0012a\u0003:fG>tg-[4ve\u0016$b!a\n\u0002\u000e\u0006E\u0005BBAH5\u0001\u0007q*A\u0005oK^\u001cuN\u001c4jO\"9\u00111\u0013\u000eA\u0002\u0005U\u0015aC;qI\u0006$X\rZ&fsN\u0004R!a&\u0002\u001eRk!!!'\u000b\u0007\u0005m5'\u0001\u0006d_2dWm\u0019;j_:LA!a(\u0002\u001a\n\u00191+\u001a;\u0002)=t\u0017I^1jY\u0006\u0014\u0017\u000e\\5us\u000eC\u0017M\\4f)\u0011\t9#!*\t\u000f\u0005\u001d6\u00041\u0001\u0002@\u0005Y\u0011n]!wC&d\u0017M\u00197f\u0003!\u0019\b.\u001e;e_^t\u0017\u0001D2m_N,7\t\\5f]R\u001c\u0018AB1di&4X-A\u0007dkJ\u0014XM\u001c;D_:4\u0017nZ\u0001\rY&t7n\u00117vgR,'o]\u000b\u0003\u0003k\u0003R!a&\u00028RKA!!/\u0002\u001a\n\u00191+Z9\u0002?\u0015t7/\u001e:f%\u00164XM]:f\u0007>tg.Z2uS>t7/\u00128bE2,G-A\u000esKN,GOU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u001dGJ,\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003m\u0019Gn\\:f%\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]\u0006)R\u000f\u001d3bi\u0016\f5\r^5wK2Kgn[\"pk:$\u0018!E5t\u0019&t7nQ8pe\u0012Lg.\u0019;pe\u0006\tB.\u001b8l\u0007>|'\u000fZ5oCR|'/\u00133\u0016\u0005\u0005-\u0007#\u0002\u001a\u0002N\u0006E\u0017bAAhg\t1q\n\u001d;j_:\u00042AMAj\u0013\r\t)n\r\u0002\u0004\u0013:$\b")
/* loaded from: input_file:kafka/server/link/ClusterLinkConnectionManager.class */
public abstract class ClusterLinkConnectionManager implements ClusterLinkFactory.ConnectionManager, Logging {
    private final ClusterLinkData linkData;
    private final String localLogicalCluster;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkMetrics metrics;
    private final KafkaConfig brokerConfig;
    private final Uuid linkId;
    private final Object stateChangeLock;
    private volatile ClusterLinkConfig clusterLinkConfig;
    private volatile boolean reverseConnectionEnabled;
    private volatile boolean isActive;
    private final AtomicReference<Level> maxLogLevel;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override // kafka.utils.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // kafka.utils.Logging
    public String msgWithLogIdent(String str) {
        String msgWithLogIdent;
        msgWithLogIdent = msgWithLogIdent(str);
        return msgWithLogIdent;
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0) {
        trace(function0);
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // kafka.utils.Logging
    public boolean isDebugEnabled() {
        boolean isDebugEnabled;
        isDebugEnabled = isDebugEnabled();
        return isDebugEnabled;
    }

    @Override // kafka.utils.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0) {
        debug(function0);
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0) {
        info(function0);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0) {
        warn(function0);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0) {
        error(function0);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0) {
        fatal(function0);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        fatal(function0, function02);
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onControllerChange(boolean z) {
        onControllerChange(z);
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onLinkMetadataPartitionLeaderChange() {
        onLinkMetadataPartitionLeaderChange();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int persistentConnectionCount() {
        int persistentConnectionCount;
        persistentConnectionCount = persistentConnectionCount();
        return persistentConnectionCount;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int reverseConnectionCount() {
        int reverseConnectionCount;
        reverseConnectionCount = reverseConnectionCount();
        return reverseConnectionCount;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    /* renamed from: lazyResources */
    public Seq<LazyResource<?>> mo1158lazyResources() {
        Seq<LazyResource<?>> mo1158lazyResources;
        mo1158lazyResources = mo1158lazyResources();
        return mo1158lazyResources;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [kafka.server.link.ClusterLinkConnectionManager] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override // kafka.utils.Logging
    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    @Override // kafka.utils.Logging
    public String logIdent() {
        return this.logIdent;
    }

    @Override // kafka.utils.Logging
    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public ClusterLinkData linkData() {
        return this.linkData;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public String localLogicalCluster() {
        return this.localLogicalCluster;
    }

    public Uuid linkId() {
        return this.linkId;
    }

    public Object stateChangeLock() {
        return this.stateChangeLock;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return this.clusterLinkConfig;
    }

    private void clusterLinkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.clusterLinkConfig = clusterLinkConfig;
    }

    public boolean reverseConnectionEnabled() {
        return this.reverseConnectionEnabled;
    }

    public void reverseConnectionEnabled_$eq(boolean z) {
        this.reverseConnectionEnabled = z;
    }

    public boolean isActive() {
        return this.isActive;
    }

    public void isActive_$eq(boolean z) {
        this.isActive = z;
    }

    public AtomicReference<Level> maxLogLevel() {
        return this.maxLogLevel;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void startup() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            info(() -> {
                return "Cluster link connection manager has started up.";
            });
            if (Predef$.MODULE$.Boolean2boolean(clusterLinkConfig().clusterLinkPaused())) {
                isActive_$eq(false);
            } else {
                isActive_$eq(true);
                updateActiveLinkCount();
            }
            resetReverseConnectionAdmin();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void reconfigure(ClusterLinkConfig clusterLinkConfig, Set<String> set) {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            debug(() -> {
                return new StringBuilder(74).append("Reconfiguring link connection manager with new configs updated=").append(set).append(" newConfig=").append(clusterLinkConfig.values()).toString();
            });
            clusterLinkConfig_$eq(clusterLinkConfig);
            if (Predef$.MODULE$.Boolean2boolean(clusterLinkConfig.clusterLinkPaused())) {
                if (isActive()) {
                    info(() -> {
                        return "Closing clients from cluster link connection manager because link has been paused";
                    });
                }
                isActive_$eq(false);
                closeClients();
            } else {
                isActive_$eq(true);
                resetReverseConnectionAdmin();
            }
            updateActiveLinkCount();
            debug(() -> {
                return "Completed reconfiguration of cluster link";
            });
        }
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onAvailabilityChange(boolean z) {
        if (z) {
            maxLogLevel().set(null);
        } else {
            maxLogLevel().set(Level.DEBUG);
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void shutdown() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            isActive_$eq(false);
            closeClients();
            mo1158lazyResources().foreach(lazyResource -> {
                lazyResource.shutdown();
                return BoxedUnit.UNIT;
            });
            info(() -> {
                return new StringBuilder(64).append("Shutdown of ClusterLinkConnectionManager with cluster link data ").append(this.linkData()).toString();
            });
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    public void closeClients() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            debug(() -> {
                return "Closing clients ";
            });
            closeReverseConnectionAdmin();
            updateActiveLinkCount();
        }
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public boolean active() {
        return isActive();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public ClusterLinkConfig currentConfig() {
        return clusterLinkConfig();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public Seq<String> linkClusters() {
        return (Seq) new $colon.colon(localLogicalCluster(), Nil$.MODULE$).$plus$plus(Option$.MODULE$.option2Iterable(linkData().clusterId()).toSeq());
    }

    public void ensureReverseConnectionsEnabled() {
        if (!isActive() || Predef$.MODULE$.Boolean2boolean(clusterLinkConfig().clusterLinkPaused())) {
            throw new ClusterLinkPausedException(new StringBuilder(36).append("Cluster link ").append(linkData().linkName()).append(" is not active,").append(" paused=").append(clusterLinkConfig().clusterLinkPaused()).toString());
        }
        if (!reverseConnectionEnabled()) {
            throw new InvalidRequestException(new StringBuilder(46).append("Cluster link '").append(linkData().linkName()).append("' is not a source initiated link").toString());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x003f, code lost:
    
        if (r1.equals(r2) == false) goto L15;
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0065, code lost:
    
        r1 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x0062, code lost:
    
        if (r1.equals(r2) == false) goto L23;
     */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void resetReverseConnectionAdmin() {
        /*
            r4 = this;
            r0 = r4
            java.lang.Object r0 = r0.stateChangeLock()
            r1 = r0
            r5 = r1
            monitor-enter(r0)
            r0 = r4
            kafka.server.link.ClusterLinkConfig r0 = r0.currentConfig()     // Catch: java.lang.Throwable -> L96
            r6 = r0
            r0 = r4
            r1 = r4
            boolean r1 = r1.isActive()     // Catch: java.lang.Throwable -> L96
            if (r1 == 0) goto L69
            scala.Predef$ r1 = scala.Predef$.MODULE$     // Catch: java.lang.Throwable -> L96
            r2 = r6
            java.lang.Boolean r2 = r2.clusterLinkPaused()     // Catch: java.lang.Throwable -> L96
            boolean r1 = r1.Boolean2boolean(r2)     // Catch: java.lang.Throwable -> L96
            if (r1 != 0) goto L69
            r1 = r6
            io.confluent.kafka.link.ClusterLinkConfig$LinkMode r1 = r1.linkMode()     // Catch: java.lang.Throwable -> L96
            boolean r1 = r1.mayActAsDestination()     // Catch: java.lang.Throwable -> L96
            if (r1 == 0) goto L42
            r1 = r6
            kafka.server.link.ConnectionMode r1 = r1.connectionMode()     // Catch: java.lang.Throwable -> L96
            kafka.server.link.ConnectionMode$Inbound$ r2 = kafka.server.link.ConnectionMode$Inbound$.MODULE$     // Catch: java.lang.Throwable -> L96
            r7 = r2
            r2 = r1
            if (r2 != 0) goto L3b
        L38:
            goto L42
        L3b:
            r2 = r7
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> L96
            if (r1 != 0) goto L65
        L42:
            r1 = r6
            io.confluent.kafka.link.ClusterLinkConfig$LinkMode r1 = r1.linkMode()     // Catch: java.lang.Throwable -> L96
            boolean r1 = r1.mayActAsSource()     // Catch: java.lang.Throwable -> L96
            if (r1 == 0) goto L69
            r1 = r6
            kafka.server.link.ConnectionMode r1 = r1.remoteLinkConnectionMode()     // Catch: java.lang.Throwable -> L96
            kafka.server.link.ConnectionMode$Inbound$ r2 = kafka.server.link.ConnectionMode$Inbound$.MODULE$     // Catch: java.lang.Throwable -> L96
            r8 = r2
            r2 = r1
            if (r2 != 0) goto L5d
        L5a:
            goto L69
        L5d:
            r2 = r8
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> L96
            if (r1 == 0) goto L69
        L65:
            r1 = 1
            goto L6a
        L69:
            r1 = 0
        L6a:
            r0.reverseConnectionEnabled_$eq(r1)     // Catch: java.lang.Throwable -> L96
            r0 = r4
            r0.closeReverseConnectionAdmin()     // Catch: java.lang.Throwable -> L96
            r0 = r4
            boolean r0 = r0.reverseConnectionEnabled()     // Catch: java.lang.Throwable -> L96
            if (r0 == 0) goto L89
            r0 = r4
            r1 = r4
            void r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$resetReverseConnectionAdmin$1(r1);
            }     // Catch: java.lang.Throwable -> L96
            r0.debug(r1)     // Catch: java.lang.Throwable -> L96
            r0 = r4
            r0.createReverseConnectionAdmin()     // Catch: java.lang.Throwable -> L96
            goto L93
        L89:
            r0 = r4
            r1 = r6
            void r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$resetReverseConnectionAdmin$2(r1);
            }     // Catch: java.lang.Throwable -> L96
            r0.debug(r1)     // Catch: java.lang.Throwable -> L96
        L93:
            r0 = r5
            monitor-exit(r0)
            return
        L96:
            r1 = move-exception
            monitor-exit(r1)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkConnectionManager.resetReverseConnectionAdmin():void");
    }

    public abstract void createReverseConnectionAdmin();

    public abstract void closeReverseConnectionAdmin();

    public void updateActiveLinkCount() {
        int i = isActive() ? 1 : 0;
        Option$.MODULE$.apply(this.metrics.activeLinkCountSensor()).foreach(sensor -> {
            $anonfun$updateActiveLinkCount$1(i, sensor);
            return BoxedUnit.UNIT;
        });
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public boolean isLinkCoordinator() {
        return this.metadataManager.isLinkCoordinator(linkData().linkName());
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public Option<Object> linkCoordinatorId() {
        return this.metadataManager.linkCoordinator(linkData().linkName(), this.brokerConfig.interBrokerListenerName()).map(node -> {
            return BoxesRunTime.boxToInteger(node.id());
        });
    }

    public static final /* synthetic */ void $anonfun$updateActiveLinkCount$1(int i, Sensor sensor) {
        sensor.record(i);
    }

    public ClusterLinkConnectionManager(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, String str, ClusterLinkMetadataManager clusterLinkMetadataManager, ClusterLinkMetrics clusterLinkMetrics, KafkaConfig kafkaConfig) {
        this.linkData = clusterLinkData;
        this.localLogicalCluster = str;
        this.metadataManager = clusterLinkMetadataManager;
        this.metrics = clusterLinkMetrics;
        this.brokerConfig = kafkaConfig;
        Log4jControllerRegistration$ log4jControllerRegistration$ = Log4jControllerRegistration$.MODULE$;
        this.linkId = new Uuid(clusterLinkData.linkId().getMostSignificantBits(), clusterLinkData.linkId().getLeastSignificantBits());
        this.stateChangeLock = new Object();
        this.clusterLinkConfig = clusterLinkConfig;
        this.reverseConnectionEnabled = false;
        this.isActive = true;
        this.maxLogLevel = new AtomicReference<>();
    }
}
