package org.apache.kafka.connect.mirror;

import java.util.List;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorHerder.class */
public class MirrorHerder extends DistributedHerder {
    private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class);
    private final MirrorMakerConfig config;
    private final SourceAndTarget sourceAndTarget;
    private boolean wasLeader;

    public MirrorHerder(MirrorMakerConfig mirrorMakerConfig, SourceAndTarget sourceAndTarget, DistributedConfig distributedConfig, Time time, Worker worker, String str, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String str2, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> list, AutoCloseable... autoCloseableArr) {
        super(distributedConfig, time, worker, str, statusBackingStore, configBackingStore, str2, restClient, connectorClientConfigOverridePolicy, list, autoCloseableArr);
        this.config = mirrorMakerConfig;
        this.sourceAndTarget = sourceAndTarget;
    }

    protected void rebalanceSuccess() {
        if (!isLeader()) {
            this.wasLeader = false;
            return;
        }
        if (!this.wasLeader) {
            log.info("This node {} is now a leader for {}. Configuring connectors...", this, this.sourceAndTarget);
            configureConnectors();
        }
        this.wasLeader = true;
    }

    private void configureConnectors() {
        MirrorMaker.CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector);
    }

    private void maybeConfigureConnector(Class<?> cls) {
        Map<String, String> connectorBaseConfig = this.config.connectorBaseConfig(this.sourceAndTarget, cls);
        Map connectorConfig = this.configState.connectorConfig(cls.getSimpleName());
        if (connectorConfig == null || !connectorConfig.equals(connectorBaseConfig)) {
            configureConnector(cls.getSimpleName(), connectorBaseConfig);
        } else {
            log.info("This node is a leader for {} and configuration for {} is already up to date.", this.sourceAndTarget, cls.getSimpleName());
        }
    }

    private void configureConnector(String str, Map<String, String> map) {
        putConnectorConfig(str, map, true, (th, created) -> {
            if (th == null) {
                log.info("{} connector configured for {}.", str, this.sourceAndTarget);
            } else if (th instanceof NotLeaderException) {
                log.info("This node lost leadership for {} while trying to update the connector configuration for {}. Using existing connector configuration.", str, this.sourceAndTarget);
            } else {
                log.error("Failed to configure {} connector for {}", new Object[]{str, this.sourceAndTarget, th});
            }
        });
    }
}
