package org.apache.seatunnel.core.starter.flink.execution;

import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TernaryBoolean;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.class */
public abstract class AbstractFlinkRuntimeEnvironment implements RuntimeEnvironment {
    private static final Logger log = LoggerFactory.getLogger(AbstractFlinkRuntimeEnvironment.class);
    protected Config config;
    protected StreamExecutionEnvironment environment;
    protected JobMode jobMode;
    protected String jobName = Constants.LOGO;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFlinkRuntimeEnvironment(Config config) {
        initialize(config);
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public abstract AbstractFlinkRuntimeEnvironment setConfig(Config config);

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public CheckResult checkConfig() {
        return EnvironmentUtil.checkRestartStrategy(this.config);
    }

    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.environment;
    }

    protected void setCheckpoint() {
        long j;
        if (this.jobMode == JobMode.BATCH) {
            log.warn("Disabled Checkpointing. In flink execution environment, checkpointing is not supported and not needed when executing jobs in BATCH mode");
        }
        if (this.config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
            j = this.config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
        } else if (this.config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
            log.warn("the parameter 'execution.checkpoint.interval' will be deprecated, please use common parameter 'checkpoint.interval' to set it");
            j = this.config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
        } else {
            j = 10000;
        }
        CheckpointConfig checkpointConfig = this.environment.getCheckpointConfig();
        this.environment.enableCheckpointing(j);
        if (this.config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
            checkpointConfig.setCheckpointTimeout(this.config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()));
        } else if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.CHECKPOINT_TIMEOUT)) {
            checkpointConfig.setCheckpointTimeout(this.config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT));
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.CHECKPOINT_MODE)) {
            String string = this.config.getString(ConfigKeyName.CHECKPOINT_MODE);
            String lowerCase = string.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -286864670:
                    if (lowerCase.equals("exactly-once")) {
                        z = false;
                        break;
                    }
                    break;
                case 2125618495:
                    if (lowerCase.equals("at-least-once")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                    break;
                case true:
                    checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
                    break;
                default:
                    log.warn("set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once", string);
                    break;
            }
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.CHECKPOINT_DATA_URI)) {
            FsStateBackend fsStateBackend = new FsStateBackend(this.config.getString(ConfigKeyName.CHECKPOINT_DATA_URI));
            if (!EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.STATE_BACKEND)) {
                this.environment.setStateBackend(fsStateBackend);
            } else if ("rocksdb".equalsIgnoreCase(this.config.getString(ConfigKeyName.STATE_BACKEND))) {
                this.environment.setStateBackend(new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE));
            }
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
            checkpointConfig.setMaxConcurrentCheckpoints(this.config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS));
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
            if (this.config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
                checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
            } else {
                checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            }
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
            checkpointConfig.setMinPauseBetweenCheckpoints(this.config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS));
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
            checkpointConfig.setTolerableCheckpointFailureNumber(this.config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createStreamEnvironment() {
        Configuration configuration = new Configuration();
        EnvironmentUtil.initConfiguration(this.config, configuration);
        this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        setTimeCharacteristic();
        setCheckpoint();
        EnvironmentUtil.setRestartStrategy(this.config, this.environment.getConfig());
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
            this.environment.setBufferTimeout(this.config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS));
        }
        if (this.config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
            this.environment.setParallelism(this.config.getInt(EnvCommonOptions.PARALLELISM.key()));
        } else if (this.config.hasPath(ConfigKeyName.PARALLELISM)) {
            log.warn("the parameter 'execution.parallelism' will be deprecated, please use common parameter 'parallelism' to set it");
            this.environment.setParallelism(this.config.getInt(ConfigKeyName.PARALLELISM));
        }
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_PARALLELISM)) {
            this.environment.setMaxParallelism(this.config.getInt(ConfigKeyName.MAX_PARALLELISM));
        }
        if (this.jobMode.equals(JobMode.BATCH)) {
            this.environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        }
    }

    private void setTimeCharacteristic() {
        if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.TIME_CHARACTERISTIC)) {
            String string = this.config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
            String lowerCase = string.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case 747457494:
                    if (lowerCase.equals("ingestion-time")) {
                        z = true;
                        break;
                    }
                    break;
                case 938185248:
                    if (lowerCase.equals("event-time")) {
                        z = false;
                        break;
                    }
                    break;
                case 1744288071:
                    if (lowerCase.equals("processing-time")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    this.environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                    return;
                case true:
                    this.environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                    return;
                case true:
                    this.environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                    return;
                default:
                    log.warn("set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time", string);
                    return;
            }
        }
    }

    public boolean isStreaming() {
        return JobMode.STREAMING.equals(this.jobMode);
    }

    public String getJobName() {
        return this.jobName;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public JobMode getJobMode() {
        return this.jobMode;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public void registerPlugin(List<URL> list) {
        list.forEach(url -> {
            log.info("register plugins : {}", url);
        });
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add((Configuration) ((Method) ((Optional) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class, "getConfiguration", new Class[0]))).orElseThrow(() -> {
                return new RuntimeException("can't find method: getConfiguration");
            })).invoke(this.environment, new Object[0]));
            arrayList.forEach(configuration -> {
                List list2 = (List) configuration.get(PipelineOptions.JARS);
                if (list2 == null) {
                    list2 = new ArrayList();
                }
                list2.addAll((Collection) list.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList()));
                configuration.set(PipelineOptions.JARS, list2.stream().distinct().collect(Collectors.toList()));
                List list3 = (List) configuration.get(PipelineOptions.CLASSPATHS);
                if (list3 == null) {
                    list3 = new ArrayList();
                }
                list3.addAll((Collection) list.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList()));
                configuration.set(PipelineOptions.CLASSPATHS, list3.stream().distinct().collect(Collectors.toList()));
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
