package org.apache.gobblin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.WorkflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager.class */
public class GobblinHelixMultiManager implements StandardMetricsBridge {
    private static final Logger log = LoggerFactory.getLogger(GobblinHelixMultiManager.class);
    private boolean dedicatedManagerCluster;
    private boolean dedicatedTaskDriverCluster;
    boolean isStandaloneMode;
    private final GobblinClusterManager.StopStatus stopStatus;
    private final Config config;
    private final EventBus eventBus;
    private final HelixManagerMetrics metrics;
    private final MultiTypeMessageHandlerFactory userDefinedMessageHandlerFactory;
    private HelixManager managerClusterHelixManager = null;
    private HelixManager jobClusterHelixManager = null;
    private HelixAdmin jobClusterHelixAdmin = null;
    private Optional<HelixManager> taskDriverHelixManager = Optional.empty();
    private Optional<HelixManager> jobClusterController = Optional.empty();
    private Optional<HelixManager> taskDriverClusterController = Optional.empty();
    private boolean dedicatedJobClusterController = true;
    boolean isLeader = false;
    private final List<LeadershipChangeAwareComponent> leadershipChangeAwareComponents = Lists.newArrayList();

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$ControllerShutdownMessageHandlerFactory.class */
    private class ControllerShutdownMessageHandlerFactory implements MultiTypeMessageHandlerFactory {

        /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$ControllerShutdownMessageHandlerFactory$ControllerShutdownMessageHandler.class */
        private class ControllerShutdownMessageHandler extends MessageHandler {
            public ControllerShutdownMessageHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() {
                String msgSubType = this._message.getMsgSubType();
                Preconditions.checkArgument(msgSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()), String.format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, msgSubType));
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                if (GobblinHelixMultiManager.this.stopStatus.isStopInProgress()) {
                    helixTaskResult.setSuccess(true);
                    return helixTaskResult;
                }
                GobblinHelixMultiManager.log.info("Handling message " + HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString());
                MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)).scheduleAtFixedRate(new Runnable() { // from class: org.apache.gobblin.cluster.GobblinHelixMultiManager.ControllerShutdownMessageHandlerFactory.ControllerShutdownMessageHandler.1
                    @Override // java.lang.Runnable
                    public void run() {
                        HelixManager manager = ControllerShutdownMessageHandler.this._notificationContext.getManager();
                        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
                        if (helixDataAccessor.getProperty(ControllerShutdownMessageHandler.this._message.getKey(helixDataAccessor.keyBuilder(), manager.getInstanceName())) == null) {
                            GobblinHelixMultiManager.this.eventBus.post(new ClusterManagerShutdownRequest());
                        }
                    }
                }, 0L, 1L, TimeUnit.SECONDS);
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
                GobblinHelixMultiManager.log.error(String.format("Failed to handle message with exception %s, error code %s, error type %s", exc, errorCode, errorType));
            }
        }

        private ControllerShutdownMessageHandlerFactory() {
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            return new ControllerShutdownMessageHandler(message, notificationContext);
        }

        public String getMessageType() {
            return GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE;
        }

        public List<String> getMessageTypes() {
            return Collections.singletonList(getMessageType());
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$ControllerUserDefinedMessageHandlerFactory.class */
    static class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {

        /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$ControllerUserDefinedMessageHandlerFactory$ControllerUserDefinedMessageHandler.class */
        private static class ControllerUserDefinedMessageHandler extends MessageHandler {
            public ControllerUserDefinedMessageHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() {
                GobblinHelixMultiManager.log.warn(String.format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(), this._message.getMsgSubType()));
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
                GobblinHelixMultiManager.log.error(String.format("Failed to handle message with exception %s, error code %s, error type %s", exc, errorCode, errorType));
            }
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            return new ControllerUserDefinedMessageHandler(message, notificationContext);
        }

        public String getMessageType() {
            return Message.MessageType.USER_DEFINE_MSG.toString();
        }

        public List<String> getMessageTypes() {
            return Collections.singletonList(getMessageType());
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$GobblinLiveInstanceChangeListener.class */
    private static class GobblinLiveInstanceChangeListener implements LiveInstanceChangeListener {
        private GobblinLiveInstanceChangeListener() {
        }

        public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
            if (GobblinHelixMultiManager.log.isDebugEnabled()) {
                Iterator<LiveInstance> it = list.iterator();
                while (it.hasNext()) {
                    GobblinHelixMultiManager.log.debug("Live Helix participant instance: " + it.next().getInstanceName());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixMultiManager$HelixManagerMetrics.class */
    private static class HelixManagerMetrics extends StandardMetricsBridge.StandardMetrics {
        public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
        private final ContextAwareHistogram clusterLeadershipChange;

        public HelixManagerMetrics(MetricContext metricContext, Config config) {
            this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, ConfigUtils.getInt(config, "metrics.timer.window.size.in.minutes", 15).intValue(), TimeUnit.MINUTES);
            this.contextAwareMetrics.add(this.clusterLeadershipChange);
        }

        public String getName() {
            return GobblinClusterManager.class.getName();
        }
    }

    public GobblinHelixMultiManager(Config config, Function<Void, MultiTypeMessageHandlerFactory> function, EventBus eventBus, GobblinClusterManager.StopStatus stopStatus) {
        this.dedicatedManagerCluster = false;
        this.dedicatedTaskDriverCluster = false;
        this.isStandaloneMode = false;
        this.config = config;
        this.eventBus = eventBus;
        this.stopStatus = stopStatus;
        this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, false);
        this.metrics = new HelixManagerMetrics(Instrumented.getMetricContext(ConfigUtils.configToState(config), getClass()), this.config);
        this.dedicatedManagerCluster = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, false);
        this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, false);
        this.userDefinedMessageHandlerFactory = function.apply(null);
        initialize();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addLeadershipChangeAwareComponent(LeadershipChangeAwareComponent leadershipChangeAwareComponent) {
        this.leadershipChangeAwareComponents.add(leadershipChangeAwareComponent);
    }

    protected static HelixManager buildHelixManager(Config config, String str, InstanceType instanceType) {
        Preconditions.checkArgument(config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
        String string = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
        log.info("Using ZooKeeper connection string: " + string);
        return GobblinHelixManagerFactory.getZKHelixManager(config.getString(str), ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, GobblinClusterManager.class.getSimpleName()), instanceType, string);
    }

    protected static HelixAdmin buildHelixAdmin(Config config) {
        return new ZKHelixAdmin.Builder().setZkAddress(config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY)).build();
    }

    public void initialize() {
        if (!this.dedicatedManagerCluster) {
            log.info("We will use same cluster to manage GobblinClusterManager and job distribution.");
            this.managerClusterHelixManager = buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED, false) ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
            this.jobClusterHelixManager = this.managerClusterHelixManager;
            this.jobClusterHelixAdmin = buildHelixAdmin(this.config);
            return;
        }
        Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
        log.info("We will use separate clusters to manage GobblinClusterManager and job distribution.");
        this.managerClusterHelixManager = buildHelixManager(this.config, GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, InstanceType.CONTROLLER);
        this.jobClusterHelixManager = buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, InstanceType.ADMINISTRATOR);
        this.jobClusterHelixAdmin = buildHelixAdmin(this.config);
        this.dedicatedJobClusterController = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED, true);
        if (this.dedicatedJobClusterController) {
            this.jobClusterController = Optional.of(buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, InstanceType.CONTROLLER));
        }
        if (this.dedicatedTaskDriverCluster) {
            this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, InstanceType.ADMINISTRATOR));
            if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED, true)) {
                this.taskDriverClusterController = Optional.of(buildHelixManager(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, InstanceType.CONTROLLER));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void connect() {
        try {
            this.isLeader = false;
            this.managerClusterHelixManager.connect();
            if (this.dedicatedManagerCluster) {
                if (this.jobClusterController.isPresent()) {
                    this.jobClusterController.get().connect();
                }
                if (this.dedicatedTaskDriverCluster && this.taskDriverClusterController.isPresent()) {
                    this.taskDriverClusterController.get().connect();
                }
                this.jobClusterHelixManager.connect();
                if (this.taskDriverHelixManager.isPresent()) {
                    this.taskDriverHelixManager.get().connect();
                }
            }
            this.jobClusterHelixManager.addLiveInstanceChangeListener(new GobblinLiveInstanceChangeListener());
            this.jobClusterHelixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), this.userDefinedMessageHandlerFactory);
            this.jobClusterHelixManager.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new ControllerShutdownMessageHandlerFactory());
            if (this.isStandaloneMode) {
                this.managerClusterHelixManager.addControllerListener(this::handleLeadershipChange);
            }
        } catch (Exception e) {
            log.error("HelixManager failed to connect", e);
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isConnected() {
        return this.managerClusterHelixManager.isConnected() && this.jobClusterHelixManager.isConnected();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void disconnect() {
        if (this.managerClusterHelixManager.isConnected()) {
            this.managerClusterHelixManager.disconnect();
        }
        if (this.dedicatedManagerCluster) {
            if (this.jobClusterHelixManager.isConnected()) {
                this.jobClusterHelixManager.disconnect();
            }
            if (this.taskDriverHelixManager.isPresent()) {
                this.taskDriverHelixManager.get().disconnect();
            }
            if (this.jobClusterController.isPresent() && this.jobClusterController.get().isConnected()) {
                this.jobClusterController.get().disconnect();
            }
            if (this.taskDriverClusterController.isPresent() && this.taskDriverClusterController.get().isConnected()) {
                this.taskDriverClusterController.get().disconnect();
            }
        }
    }

    @VisibleForTesting
    void handleLeadershipChange(NotificationContext notificationContext) {
        this.metrics.clusterLeadershipChange.update(1);
        if (!this.managerClusterHelixManager.isLeader()) {
            if (this.isLeader) {
                this.isLeader = false;
                Iterator<LeadershipChangeAwareComponent> it = this.leadershipChangeAwareComponents.iterator();
                while (it.hasNext()) {
                    it.next().becomeStandby();
                }
                return;
            }
            return;
        }
        log.info("Leader notification for {} isLeader {} HM.isLeader {}", new Object[]{this.managerClusterHelixManager.getInstanceName(), Boolean.valueOf(this.isLeader), Boolean.valueOf(this.managerClusterHelixManager.isLeader())});
        if (this.isLeader) {
            return;
        }
        log.info("New Helix Controller leader {}", this.managerClusterHelixManager.getInstanceName());
        cleanUpJobs();
        Iterator<LeadershipChangeAwareComponent> it2 = this.leadershipChangeAwareComponents.iterator();
        while (it2.hasNext()) {
            it2.next().becomeActive();
        }
        this.isLeader = true;
    }

    @VisibleForTesting
    public void cleanUpJobs() {
        cleanUpJobs(this.jobClusterHelixManager);
        this.taskDriverHelixManager.ifPresent(this::cleanUpJobs);
    }

    private void cleanUpJobs(HelixManager helixManager) {
        TaskDriver taskDriver = new TaskDriver(helixManager);
        Map workflows = taskDriver.getWorkflows();
        log.debug("cleanUpJobs workflow count {} workflows {}", Integer.valueOf(workflows.size()), workflows.keySet());
        boolean z = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS, false);
        for (Map.Entry entry : workflows.entrySet()) {
            String str = (String) entry.getKey();
            if ((str.contains(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX) || str.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX)) && !z) {
                log.info("Distributed job {} won't be deleted.", str);
            } else if (((WorkflowConfig) entry.getValue()).getTargetState() != TargetState.DELETE) {
                taskDriver.delete(str);
                log.info("Requested delete of workflowName {}", str);
            }
        }
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        return ImmutableList.of(this.metrics);
    }

    public HelixManager getManagerClusterHelixManager() {
        return this.managerClusterHelixManager;
    }

    public HelixManager getJobClusterHelixManager() {
        return this.jobClusterHelixManager;
    }

    public HelixAdmin getJobClusterHelixAdmin() {
        return this.jobClusterHelixAdmin;
    }

    public Optional<HelixManager> getTaskDriverHelixManager() {
        return this.taskDriverHelixManager;
    }

    public boolean isLeader() {
        return this.isLeader;
    }
}
