package org.apache.gobblin.cluster;

import com.github.rholder.retry.AttemptTimeLimiters;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.commit.CommitStepException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias("HelixParticipantCheck")
/* loaded from: input_file:org/apache/gobblin/cluster/HelixAssignedParticipantCheck.class */
public class HelixAssignedParticipantCheck implements CommitStep {
    private static final Logger log = LoggerFactory.getLogger(HelixAssignedParticipantCheck.class);

    @VisibleForTesting
    private static volatile HelixManager helixManager = null;
    private static volatile Retryer<Boolean> retryer = RetryerBuilder.newBuilder().retryIfException().withStopStrategy(StopStrategies.stopAfterAttempt(3)).withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(3000, TimeUnit.MILLISECONDS)).build();
    private final String helixInstanceName;
    private final String helixJob;
    private final int partitionNum;
    private final Config config;

    public static void initHelixManager(Config config) throws Exception {
        if (helixManager == null) {
            synchronized (HelixAssignedParticipantCheck.class) {
                if (helixManager == null) {
                    helixManager = HelixManagerFactory.getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), HelixAssignedParticipantCheck.class.getSimpleName(), InstanceType.SPECTATOR, config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
                    helixManager.connect();
                }
            }
        }
    }

    private void refreshHelixManager() throws Exception {
        synchronized (HelixAssignedParticipantCheck.class) {
            helixManager.disconnect();
            helixManager = null;
            initHelixManager(this.config);
        }
    }

    public HelixAssignedParticipantCheck(Config config) throws Exception {
        this.config = config;
        initHelixManager(config);
        this.helixInstanceName = config.getString(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY);
        this.helixJob = config.getString(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY);
        this.partitionNum = config.getInt(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY);
    }

    public boolean isCompleted() {
        return false;
    }

    public void execute() throws CommitStepException {
        boolean z;
        log.info(String.format("HelixParticipantCheck step called for Helix Instance: %s, Helix job: %s, Helix partition: %d", this.helixInstanceName, this.helixJob, Integer.valueOf(this.partitionNum)));
        try {
            z = ((Boolean) retryer.call(() -> {
                try {
                    JobContext jobContext = new TaskDriver(helixManager).getJobContext(this.helixJob);
                    if (jobContext == null) {
                        return false;
                    }
                    String assignedParticipant = jobContext.getAssignedParticipant(this.partitionNum);
                    if (assignedParticipant == null) {
                        log.error("The current assigned participant is null. This implies that \n\t\t(a)Helix failed to write to zookeeper, which is often caused by lack of compression leading / exceeding zookeeper jute max buffer size (Default 1MB)\n\t\t(b)Helix reassigned the task (unlikely if this current task has been running without issue. Helix does not have code for reassigning \"running\" tasks)\n\t\tNote: This logic is true as of Helix version 1.0.2 and ZK version 3.6");
                        return false;
                    }
                    boolean equalsIgnoreCase = assignedParticipant.equalsIgnoreCase(this.helixInstanceName);
                    if (!equalsIgnoreCase) {
                        log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}", this.helixInstanceName, assignedParticipant);
                    }
                    return Boolean.valueOf(equalsIgnoreCase);
                } catch (Exception e) {
                    log.info("Encountered exception when executing " + getClass().getSimpleName(), e);
                    log.info("Refreshing Helix manager..");
                    refreshHelixManager();
                    throw e;
                }
            })).booleanValue();
        } catch (ExecutionException | RetryException e) {
            log.error("Cannot complete participant assignment check within the retry limit due to: {}", e);
            z = true;
        }
        if (!z) {
            throw new CommitStepException(String.format("Helix instance %s not the assigned participant for partition %d", this.helixInstanceName, Integer.valueOf(this.partitionNum)));
        }
    }

    public static HelixManager getHelixManager() {
        return helixManager;
    }
}
