package org.apache.kylin.streaming.jobs;

import java.io.IOException;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.event.StreamingJobDropEvent;
import org.apache.kylin.streaming.event.StreamingJobKillEvent;
import org.apache.kylin.streaming.event.StreamingJobMetaCleanEvent;
import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.JobKiller;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.apache.spark.launcher.SparkAppHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingJobListener.class */
public class StreamingJobListener implements SparkAppHandle.Listener {
    private static final Logger logger = LoggerFactory.getLogger(StreamingJobListener.class);
    private String project;
    private String jobId;
    private String runnable;

    public StreamingJobListener() {
    }

    public StreamingJobListener(String str, String str2) {
        this.project = str;
        this.jobId = str2;
    }

    public void stateChanged(SparkAppHandle sparkAppHandle) {
        if (!sparkAppHandle.getState().isFinal()) {
            if (this.runnable == null && SparkAppHandle.State.RUNNING == sparkAppHandle.getState()) {
                this.runnable = "true";
                MetaInfoUpdater.updateJobState(this.project, this.jobId, JobStatusEnum.RUNNING);
                return;
            }
            return;
        }
        this.runnable = null;
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).getStreamingJobByUuid(this.jobId);
        if (isFailed(sparkAppHandle.getState()) && !streamingJobByUuid.isSkipListener()) {
            logger.warn("The streaming job {} has terminated unexpectedly…", this.jobId);
            sparkAppHandle.kill();
            JobKiller.killProcess(streamingJobByUuid);
            JobKiller.killApplication(this.jobId);
            MetaInfoUpdater.updateJobState(this.project, this.jobId, Sets.newHashSet(JobStatusEnum.ERROR, JobStatusEnum.STOPPED), JobStatusEnum.ERROR);
            return;
        }
        if (isFinished(sparkAppHandle.getState())) {
            sparkAppHandle.stop();
            JobKiller.killProcess(streamingJobByUuid);
            JobKiller.killApplication(this.jobId);
            MetaInfoUpdater.updateJobState(this.project, this.jobId, Sets.newHashSet(JobStatusEnum.ERROR, JobStatusEnum.STOPPED), JobStatusEnum.STOPPED);
        }
    }

    private boolean isFailed(SparkAppHandle.State state) {
        return SparkAppHandle.State.FAILED == state || SparkAppHandle.State.KILLED == state || SparkAppHandle.State.LOST == state;
    }

    private boolean isFinished(SparkAppHandle.State state) {
        return SparkAppHandle.State.FINISHED == state;
    }

    public void infoChanged(SparkAppHandle sparkAppHandle) {
    }

    @Subscribe
    public void onStreamingJobKill(StreamingJobKillEvent streamingJobKillEvent) {
        String modelId = streamingJobKillEvent.getModelId();
        StreamingScheduler streamingScheduler = StreamingScheduler.getInstance(streamingJobKillEvent.getProject());
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.STOPPED);
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.STOPPED);
    }

    @Subscribe
    public void onStreamingJobDrop(StreamingJobDropEvent streamingJobDropEvent) {
        String modelId = streamingJobDropEvent.getModelId();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), streamingJobDropEvent.getProject());
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString());
        String jobId2 = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
        streamingJobManager.deleteStreamingJob(jobId);
        streamingJobManager.deleteStreamingJob(jobId2);
    }

    @Subscribe
    public void onStreamingJobMetaCleanEvent(StreamingJobMetaCleanEvent streamingJobMetaCleanEvent) {
        List<Path> deletedMetaPath = streamingJobMetaCleanEvent.getDeletedMetaPath();
        if (CollectionUtils.isEmpty(deletedMetaPath)) {
            logger.debug("path list is empty, skip to delete.");
        } else {
            logger.info("begin to delete streaming meta path size:{}", Integer.valueOf(deletedMetaPath.size()));
            deletedMetaPath.forEach(path -> {
                try {
                    logger.debug("delete streaming meta {} path:{}", Boolean.valueOf(HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), path)), path);
                } catch (IOException e) {
                    logger.warn("delete streaming meta path:{} error", path, e);
                }
            });
        }
    }
}
