package org.apache.kylin.engine.mr.common;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.exception.MapReduceException;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-mr-3.0.1.jar:org/apache/kylin/engine/mr/common/MapReduceExecutable.class */
public class MapReduceExecutable extends AbstractExecutable {
    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
    private static final String KEY_MR_JOB = "MR_JOB_CLASS";
    private static final String KEY_PARAMS = "MR_JOB_PARAMS";
    private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
    private final Lock threadLock = new ReentrantLock();
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) MapReduceExecutable.class);
    private static final Option OPTION_JOB_CONF;
    private static final Option OPTION_CUBE_NAME;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-engine-mr-3.0.1.jar:org/apache/kylin/engine/mr/common/MapReduceExecutable$CustomParser.class */
    public static class CustomParser extends GnuParser {
        private List<String> remainingArgs = Lists.newArrayList();

        protected void processOption(String str, ListIterator listIterator) throws ParseException {
            if (getOptions().hasOption(str)) {
                super.processOption(str, listIterator);
            } else {
                this.remainingArgs.add(str);
                this.remainingArgs.add(listIterator.next().toString());
            }
        }

        public List<String> getRemainingArgs() {
            return this.remainingArgs;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public void onExecuteStart(ExecutableContext executableContext) {
        Output output = getOutput();
        if (!output.getExtra().containsKey("startTime")) {
            super.onExecuteStart(executableContext);
            return;
        }
        String str = output.getExtra().get("mr_job_id");
        if (str == null) {
            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            return;
        }
        try {
            Configuration configuration = new Configuration(HadoopUtil.getCurrentConfiguration());
            overwriteJobConf(configuration, executableContext.getConfig(), getMapReduceParams().trim().split("\\s+"));
            Job job = new Cluster(configuration).getJob(JobID.forName(str));
            if (job == null || job.getJobState() == JobStatus.State.FAILED) {
                super.onExecuteStart(executableContext);
            } else {
                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            }
        } catch (IOException | ParseException e) {
            logger.warn("error get hadoop status");
            super.onExecuteStart(executableContext);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            logger.warn("error get hadoop status");
            super.onExecuteStart(executableContext);
        }
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        Job job;
        String mapReduceJobClass = getMapReduceJobClass();
        DistributedLock distributedLock = null;
        Preconditions.checkNotNull(mapReduceJobClass);
        try {
            if (getIsNeedLock()) {
                distributedLock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
                getLock(distributedLock);
            }
            ExecutableManager manager = getManager();
            Configuration configuration = new Configuration(HadoopUtil.getCurrentConfiguration());
            String[] overwriteJobConf = overwriteJobConf(configuration, executableContext.getConfig(), getMapReduceParams().trim().split("\\s+"));
            Map<String, String> extra = manager.getOutput(getId()).getExtra();
            if (extra.containsKey("mr_job_id")) {
                job = new Cluster(configuration).getJob(JobID.forName(extra.get("mr_job_id")));
                logger.info("mr_job_id:" + extra.get("mr_job_id") + " resumed");
            } else {
                AbstractHadoopJob abstractHadoopJob = (AbstractHadoopJob) ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(new Class[0]).newInstance(new Object[0]);
                abstractHadoopJob.setConf(configuration);
                abstractHadoopJob.setAsync(true);
                logger.info("parameters of the MapReduceExecutable: {}", getMapReduceParams());
                try {
                    abstractHadoopJob.run(overwriteJobConf);
                    if (abstractHadoopJob.isSkipped()) {
                        if (!isDiscarded()) {
                            return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped");
                        }
                        if (getIsNeedLock()) {
                            releaseLock(distributedLock);
                        }
                        return new ExecuteResult(ExecuteResult.State.DISCARDED, "skipped");
                    }
                    job = abstractHadoopJob.getJob();
                } catch (Exception e) {
                    StringBuilder sb = new StringBuilder();
                    logger.error("error execute " + toString(), (Throwable) e);
                    StringWriter stringWriter = new StringWriter();
                    e.printStackTrace(new PrintWriter(stringWriter));
                    sb.append(stringWriter.toString()).append("\n");
                    sb.append("result code:").append(2);
                    if (!isDiscarded()) {
                        return new ExecuteResult(ExecuteResult.State.ERROR, sb.toString(), e);
                    }
                    if (getIsNeedLock()) {
                        releaseLock(distributedLock);
                    }
                    return new ExecuteResult(ExecuteResult.State.DISCARDED, sb.toString());
                }
            }
            StringBuilder sb2 = new StringBuilder();
            HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, sb2);
            JobStepStatusEnum jobStepStatusEnum = JobStepStatusEnum.NEW;
            while (!isDiscarded() && !isPaused()) {
                JobStepStatusEnum checkStatus = HadoopJobStatusChecker.checkStatus(job, sb2);
                if (jobStepStatusEnum == JobStepStatusEnum.KILLED) {
                    manager.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
                    if (!isDiscarded()) {
                        return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
                    }
                    if (getIsNeedLock()) {
                        releaseLock(distributedLock);
                    }
                    return new ExecuteResult(ExecuteResult.State.DISCARDED, "killed by admin");
                }
                if (jobStepStatusEnum == JobStepStatusEnum.WAITING && (checkStatus == JobStepStatusEnum.FINISHED || checkStatus == JobStepStatusEnum.ERROR || checkStatus == JobStepStatusEnum.RUNNING)) {
                    setMapReduceWaitTime(System.currentTimeMillis() - getStartTime());
                }
                manager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
                jobStepStatusEnum = checkStatus;
                if (jobStepStatusEnum.isComplete()) {
                    Map<String, String> info = hadoopCmdOutput.getInfo();
                    readCounters(hadoopCmdOutput, info);
                    manager.addJobInfo(getId(), info);
                    if (jobStepStatusEnum == JobStepStatusEnum.FINISHED) {
                        if (!isDiscarded()) {
                            return new ExecuteResult(ExecuteResult.State.SUCCEED, sb2.toString());
                        }
                        if (getIsNeedLock()) {
                            releaseLock(distributedLock);
                        }
                        return new ExecuteResult(ExecuteResult.State.DISCARDED, sb2.toString());
                    }
                    if (!isDiscarded()) {
                        return ExecuteResult.createFailed(new MapReduceException(sb2.toString()));
                    }
                    if (getIsNeedLock()) {
                        releaseLock(distributedLock);
                    }
                    return new ExecuteResult(ExecuteResult.State.DISCARDED, sb2.toString());
                }
                Thread.sleep(executableContext.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
            }
            if (job != null) {
                try {
                    job.killJob();
                } catch (Exception e2) {
                    logger.warn("failed to kill hadoop job: " + job.getJobID(), (Throwable) e2);
                }
            }
            if (!isDiscarded()) {
                return new ExecuteResult(ExecuteResult.State.STOPPED, sb2.toString());
            }
            if (getIsNeedLock()) {
                releaseLock(distributedLock);
            }
            return new ExecuteResult(ExecuteResult.State.DISCARDED, sb2.toString());
        } catch (ReflectiveOperationException e3) {
            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), (Throwable) e3);
            if (!isDiscarded()) {
                return ExecuteResult.createError(e3);
            }
            if (getIsNeedLock()) {
                releaseLock(null);
            }
            return new ExecuteResult(ExecuteResult.State.DISCARDED, e3.getMessage());
        } catch (Exception e4) {
            logger.error("error execute " + toString(), (Throwable) e4);
            if (!isDiscarded()) {
                return ExecuteResult.createError(e4);
            }
            if (getIsNeedLock()) {
                releaseLock(null);
            }
            return new ExecuteResult(ExecuteResult.State.DISCARDED, e4.getMessage());
        }
    }

    private void readCounters(HadoopCmdOutput hadoopCmdOutput, Map<String, String> map) {
        hadoopCmdOutput.updateJobCounter();
        map.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
        map.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead());
        map.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
        String param = getParam(KEY_COUNTER_SAVEAS);
        if (param != null) {
            String[] split = param.split(",");
            saveCounterAs(hadoopCmdOutput.getMapInputRecords(), split, 0, map);
            saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), split, 1, map);
            saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), split, 2, map);
        }
    }

    private void saveCounterAs(String str, String[] strArr, int i, Map<String, String> map) {
        if (strArr.length <= i || StringUtils.isBlank(strArr[i])) {
            return;
        }
        map.put(strArr[i].trim(), str);
    }

    public long getMapReduceWaitTime() {
        return getExtraInfoAsLong("mapReduceWaitTime", 0L);
    }

    public void setMapReduceWaitTime(long j) {
        addExtraInfo("mapReduceWaitTime", j + "");
    }

    public String getMapReduceJobClass() throws ExecuteException {
        return getParam(KEY_MR_JOB);
    }

    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> cls) {
        setParam(KEY_MR_JOB, cls.getName());
    }

    public String getMapReduceParams() {
        return getParam(KEY_PARAMS);
    }

    public void setMapReduceParams(String str) {
        setParam(KEY_PARAMS, str);
    }

    public void setCounterSaveAs(String str) {
        setParam(KEY_COUNTER_SAVEAS, str);
    }

    public void setIsNeedLock(Boolean bool) {
        setParam("isNeedLock", String.valueOf(bool));
    }

    public boolean getIsNeedLock() {
        String param = getParam("isNeedLock");
        if (Strings.isNullOrEmpty(param)) {
            return false;
        }
        return Boolean.parseBoolean(param);
    }

    public void setIsNeedReleaseLock(Boolean bool) {
        setParam("isNeedReleaseLock", String.valueOf(bool));
    }

    public boolean getIsNeedReleaseLock() {
        String param = getParam("isNeedReleaseLock");
        if (Strings.isNullOrEmpty(param)) {
            return false;
        }
        return Boolean.parseBoolean(param);
    }

    public void setLockPathName(String str) {
        setParam("lockPathName", str);
    }

    public String getLockPathName() {
        return getParam("lockPathName");
    }

    public void setJobFlowJobId(String str) {
        setParam("jobFlowJobId", str);
    }

    public String getJobFlowJobId() {
        return getParam("jobFlowJobId");
    }

    private void getLock(DistributedLock distributedLock) throws InterruptedException {
        logger.info("{} try to get zk lock, zk client {} ", getId(), distributedLock.getClient());
        String ephemeralLockPathName = getEphemeralLockPathName();
        String cubeJobLockPathName = getCubeJobLockPathName();
        boolean z = true;
        long currentTimeMillis = System.currentTimeMillis();
        boolean isLocked = distributedLock.isLocked(cubeJobLockPathName);
        logger.info("cube job {} zk lock is isLockedByTheJob:{}", getId(), Boolean.valueOf(isLocked));
        if (isLocked) {
            distributedLock.lock(ephemeralLockPathName);
        } else {
            while (z) {
                z = distributedLock.isLocked(getCubeJobLockParentPathName());
                if (!z) {
                    z = distributedLock.isLocked(ephemeralLockPathName);
                    logger.info("zookeeper lock path :{}, is locked by other job result is {}", ephemeralLockPathName, Boolean.valueOf(z));
                    if (z) {
                        continue;
                    } else {
                        try {
                            logger.debug("{} before start to get lock ephemeralLockPath {}", getId(), ephemeralLockPathName);
                            this.threadLock.lock();
                            logger.debug("{} start to get lock ephemeralLockPath {}", getId(), ephemeralLockPathName);
                            boolean lock = distributedLock.lock(ephemeralLockPathName);
                            logger.debug("{} finish get lock ephemeralLockPath {},getLocked {}", getId(), ephemeralLockPathName, Boolean.valueOf(lock));
                            this.threadLock.unlock();
                            logger.debug("{} finish unlock the thread lock ,ephemeralLockPath {} ", getId(), ephemeralLockPathName);
                            if (lock) {
                                try {
                                    if (distributedLock.globalPermanentLock(cubeJobLockPathName)) {
                                        break;
                                    } else if (distributedLock.isLocked(ephemeralLockPathName)) {
                                        distributedLock.unlock(ephemeralLockPathName);
                                    }
                                } catch (Exception e) {
                                    if (distributedLock.isLocked(ephemeralLockPathName)) {
                                        distributedLock.unlock(ephemeralLockPathName);
                                    }
                                }
                            }
                            z = true;
                        } catch (Throwable th) {
                            this.threadLock.unlock();
                            logger.debug("{} finish unlock the thread lock ,ephemeralLockPath {} ", getId(), ephemeralLockPathName);
                            throw th;
                        }
                    }
                }
                logger.info("{}, parent lock path({}) is locked by other job result is {} ,ephemeral lock path :{} is locked by other job result is {},will try after one minute", getId(), getCubeJobLockParentPathName(), Boolean.valueOf(z), ephemeralLockPathName, Boolean.valueOf(z));
                Thread.sleep(60000L);
            }
        }
        logger.info("job {} get zookeeper lock path:{} success,zookeeper get lock costTime : {} s", getId(), cubeJobLockPathName, Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    private void releaseLock(DistributedLock distributedLock) {
        String cubeJobLockParentPathName = getCubeJobLockParentPathName();
        String ephemeralLockPathName = getEphemeralLockPathName();
        if (distributedLock.isLocked(getCubeJobLockPathName())) {
            distributedLock.purgeLocks(cubeJobLockParentPathName);
            logger.info("{} unlock cube job dict lock path({}) success", getJobFlowJobId(), cubeJobLockParentPathName);
            if (distributedLock.isLocked(ephemeralLockPathName)) {
                distributedLock.purgeLocks(ephemeralLockPathName);
                logger.info("{} unlock cube job ephemeral lock path({}) success", getJobFlowJobId(), ephemeralLockPathName);
            }
        }
    }

    private String getEphemeralLockPathName() {
        String lockPathName = getLockPathName();
        if (Strings.isNullOrEmpty(lockPathName)) {
            throw new IllegalArgumentException("cube job lock path name is null");
        }
        return CubeJobLockUtil.getEphemeralLockPath(lockPathName);
    }

    private String getCubeJobLockPathName() {
        String lockPathName = getLockPathName();
        if (Strings.isNullOrEmpty(lockPathName)) {
            throw new IllegalArgumentException("cube job lock path name is null");
        }
        String jobFlowJobId = getJobFlowJobId();
        if (Strings.isNullOrEmpty(jobFlowJobId)) {
            throw new IllegalArgumentException("cube job lock path flowJobId is null");
        }
        return CubeJobLockUtil.getLockPath(lockPathName, jobFlowJobId);
    }

    private String getCubeJobLockParentPathName() {
        String lockPathName = getLockPathName();
        if (Strings.isNullOrEmpty(lockPathName)) {
            throw new IllegalArgumentException(" create mr hive dict lock path name is null");
        }
        return CubeJobLockUtil.getLockPath(lockPathName, null);
    }

    private String[] overwriteJobConf(Configuration configuration, KylinConfig kylinConfig, String[] strArr) throws ParseException {
        Options options = new Options();
        options.addOption(OPTION_JOB_CONF);
        options.addOption(OPTION_CUBE_NAME);
        CustomParser customParser = new CustomParser();
        CommandLine parse = customParser.parse(options, strArr);
        String optionValue = parse.getOptionValue(BatchConstants.ARG_CONF);
        String optionValue2 = parse.getOptionValue(BatchConstants.ARG_CUBE_NAME);
        ArrayList newArrayList = Lists.newArrayList();
        if (StringUtils.isNotBlank(optionValue)) {
            configuration.addResource(new Path(optionValue));
        }
        if (StringUtils.isNotBlank(optionValue2)) {
            for (Map.Entry<String, String> entry : CubeManager.getInstance(kylinConfig).getCube(optionValue2).getConfig().getMRConfigOverride().entrySet()) {
                configuration.set(entry.getKey(), entry.getValue());
            }
            if (configuration.get("mapreduce.job.is-mem-hungry") != null && Boolean.parseBoolean(configuration.get("mapreduce.job.is-mem-hungry"))) {
                for (Map.Entry<String, String> entry2 : CubeManager.getInstance(kylinConfig).getCube(optionValue2).getConfig().getMemHungryConfigOverride().entrySet()) {
                    configuration.set(entry2.getKey(), entry2.getValue());
                }
            }
            newArrayList.add("-cubename");
            newArrayList.add(optionValue2);
        }
        newArrayList.addAll(customParser.getRemainingArgs());
        return (String[]) newArrayList.toArray(new String[newArrayList.size()]);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CONF);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OPTION_JOB_CONF = OptionBuilder.create(BatchConstants.ARG_CONF);
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
    }
}
