package org.apache.inlong.manager.plugin.flink;

import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.class */
public class IntegrationTaskRunner implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(IntegrationTaskRunner.class);
    private static final Integer TRY_MAX_TIMES = 60;
    private static final Integer INTERVAL = 10;
    private final FlinkService flinkService;
    private final FlinkInfo flinkInfo;
    private final Integer commitType;

    public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo, Integer num) {
        this.flinkService = flinkService;
        this.flinkInfo = flinkInfo;
        this.commitType = num;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i;
        TaskCommitType taskCommitType = TaskCommitType.getInstance(this.commitType);
        if (taskCommitType == null) {
            taskCommitType = TaskCommitType.START_NOW;
        }
        switch (taskCommitType) {
            case START_NOW:
                try {
                    String submit = this.flinkService.submit(this.flinkInfo);
                    this.flinkInfo.setJobId(submit);
                    log.info("Start job {} success in backend", submit);
                    return;
                } catch (Exception e) {
                    String format = String.format("Start job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e));
                    log.warn(format);
                    this.flinkInfo.setException(true);
                    this.flinkInfo.setExceptionMsg(format);
                    return;
                }
            case RESUME:
                try {
                    log.info("Restore job {} success in backend", this.flinkService.restore(this.flinkInfo));
                    return;
                } catch (Exception e2) {
                    String format2 = String.format("Restore job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e2));
                    log.warn(format2);
                    this.flinkInfo.setException(true);
                    this.flinkInfo.setExceptionMsg(format2);
                    return;
                }
            case RESTART:
                try {
                    StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
                    FlinkConfig flinkConfig = this.flinkService.getFlinkConfig();
                    stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                    stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
                    String stopJob = this.flinkService.stopJob(this.flinkInfo.getJobId(), stopWithSavepointRequest);
                    this.flinkInfo.setSavepointPath(stopJob);
                    log.info("the jobId: {} savepoint: {} ", this.flinkInfo.getJobId(), stopJob);
                    i = 0;
                } catch (Exception e3) {
                    String format3 = String.format("Restart job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e3));
                    log.warn(format3);
                    this.flinkInfo.setException(true);
                    this.flinkInfo.setExceptionMsg(format3);
                    return;
                }
                while (true) {
                    if (i < TRY_MAX_TIMES.intValue()) {
                        JobStatus jobStatus = this.flinkService.getJobStatus(this.flinkInfo.getJobId());
                        if (jobStatus == JobStatus.FINISHED) {
                            try {
                                log.info("Restore job {} success in backend", this.flinkService.restore(this.flinkInfo));
                            } catch (Exception e4) {
                                log.error("Restore job failed in backend", e4);
                            }
                        } else {
                            log.info("Try start job  but the job {} is {}", this.flinkInfo.getJobId(), jobStatus.toString());
                            try {
                                Thread.sleep(INTERVAL.intValue() * 1000);
                            } catch (Exception e5) {
                                e5.printStackTrace();
                            }
                            i++;
                        }
                        String format32 = String.format("Restart job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e3));
                        log.warn(format32);
                        this.flinkInfo.setException(true);
                        this.flinkInfo.setExceptionMsg(format32);
                        return;
                    }
                }
                log.info("Restart job {} success in backend", this.flinkInfo.getJobId());
                return;
            case STOP:
                try {
                    StopWithSavepointRequest stopWithSavepointRequest2 = new StopWithSavepointRequest();
                    FlinkConfig flinkConfig2 = this.flinkService.getFlinkConfig();
                    stopWithSavepointRequest2.setDrain(flinkConfig2.isDrain());
                    stopWithSavepointRequest2.setTargetDirectory(flinkConfig2.getSavepointDirectory());
                    String stopJob2 = this.flinkService.stopJob(this.flinkInfo.getJobId(), stopWithSavepointRequest2);
                    this.flinkInfo.setSavepointPath(stopJob2);
                    log.info("the jobId {} savepoint: {} ", this.flinkInfo.getJobId(), stopJob2);
                    return;
                } catch (Exception e6) {
                    String format4 = String.format("stop job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e6));
                    log.warn(format4);
                    this.flinkInfo.setException(true);
                    this.flinkInfo.setExceptionMsg(format4);
                    return;
                }
            case DELETE:
                try {
                    this.flinkService.cancelJob(this.flinkInfo.getJobId());
                    log.info("delete job {} success in backend", this.flinkInfo.getJobId());
                    if (this.flinkService.getJobStatus(this.flinkInfo.getJobId()).isTerminalState()) {
                        log.info("delete job {} success in backend", this.flinkInfo.getJobId());
                    } else {
                        log.info("delete job {} failed in backend", this.flinkInfo.getJobId());
                    }
                    return;
                } catch (Exception e7) {
                    String format5 = String.format("delete job %s failed in backend exception[%s]", this.flinkInfo.getJobId(), FlinkUtils.getExceptionStackMsg(e7));
                    log.warn(format5);
                    this.flinkInfo.setException(true);
                    this.flinkInfo.setExceptionMsg(format5);
                    return;
                }
            default:
                this.flinkInfo.setException(true);
                log.warn("not found commitType");
                this.flinkInfo.setExceptionMsg("not found commitType");
                return;
        }
    }
}
