/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkStateBackendFactory;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkExecutionEnvironments {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);

    public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage) {
        return FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, filesToStage, null);
    }

    static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) {
        LocalEnvironment flinkBatchEnv;
        LOG.info("Creating a Batch Execution Environment.");
        String flinkMasterHostPort = FlinkExecutionEnvironments.stripHttpSchema(options.getFlinkMaster());
        Configuration flinkConfiguration = FlinkExecutionEnvironments.getFlinkConfiguration(confDir);
        if ("[local]".equals(flinkMasterHostPort)) {
            FlinkExecutionEnvironments.setManagedMemoryByFraction(flinkConfiguration);
            flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment((Configuration)flinkConfiguration);
        } else if ("[collection]".equals(flinkMasterHostPort)) {
            flinkBatchEnv = new CollectionEnvironment();
        } else if ("[auto]".equals(flinkMasterHostPort)) {
            flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
        } else {
            int defaultPort = flinkConfiguration.getInteger(RestOptions.PORT);
            HostAndPort hostAndPort = HostAndPort.fromString((String)flinkMasterHostPort).withDefaultPort(defaultPort);
            flinkConfiguration.setInteger(RestOptions.PORT, hostAndPort.getPort());
            flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment((String)hostAndPort.getHost(), (int)hostAndPort.getPort(), (Configuration)flinkConfiguration, (String[])filesToStage.toArray(new String[filesToStage.size()]));
            LOG.info("Using Flink Master URL {}:{}.", (Object)hostAndPort.getHost(), (Object)hostAndPort.getPort());
        }
        flinkBatchEnv.getConfig().setExecutionMode(ExecutionMode.valueOf((String)options.getExecutionModeForBatch()));
        if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
            flinkBatchEnv.setParallelism(options.getParallelism().intValue());
        }
        int parallelism = flinkBatchEnv instanceof CollectionEnvironment ? 1 : FlinkExecutionEnvironments.determineParallelism(options.getParallelism(), flinkBatchEnv.getParallelism(), flinkConfiguration);
        flinkBatchEnv.setParallelism(parallelism);
        options.setParallelism(parallelism);
        if (options.getObjectReuse().booleanValue()) {
            flinkBatchEnv.getConfig().enableObjectReuse();
        } else {
            flinkBatchEnv.getConfig().disableObjectReuse();
        }
        FlinkExecutionEnvironments.applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
        return flinkBatchEnv;
    }

    public static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage) {
        return FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, filesToStage, null);
    }

    @VisibleForTesting
    static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) {
        long checkpointInterval;
        long retryDelay;
        LocalStreamEnvironment flinkStreamEnv;
        LOG.info("Creating a Streaming Environment.");
        String masterUrl = FlinkExecutionEnvironments.stripHttpSchema(options.getFlinkMaster());
        Configuration flinkConfiguration = FlinkExecutionEnvironments.getFlinkConfiguration(confDir);
        if ("[local]".equals(masterUrl)) {
            FlinkExecutionEnvironments.setManagedMemoryByFraction(flinkConfiguration);
            flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment((int)StreamExecutionEnvironment.getDefaultLocalParallelism(), (Configuration)flinkConfiguration);
        } else if ("[auto]".equals(masterUrl)) {
            flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        } else {
            int defaultPort = flinkConfiguration.getInteger(RestOptions.PORT);
            HostAndPort hostAndPort = HostAndPort.fromString((String)masterUrl).withDefaultPort(defaultPort);
            flinkConfiguration.setInteger(RestOptions.PORT, hostAndPort.getPort());
            SavepointRestoreSettings savepointRestoreSettings = options.getSavepointPath() != null ? SavepointRestoreSettings.forPath((String)options.getSavepointPath(), (boolean)options.getAllowNonRestoredState()) : SavepointRestoreSettings.none();
            flinkStreamEnv = new RemoteStreamEnvironment(hostAndPort.getHost(), hostAndPort.getPort(), flinkConfiguration, filesToStage.toArray(new String[filesToStage.size()]), null, savepointRestoreSettings);
            LOG.info("Using Flink Master URL {}:{}.", (Object)hostAndPort.getHost(), (Object)hostAndPort.getPort());
        }
        int parallelism = FlinkExecutionEnvironments.determineParallelism(options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfiguration);
        flinkStreamEnv.setParallelism(parallelism);
        if (options.getMaxParallelism() > 0) {
            flinkStreamEnv.setMaxParallelism(options.getMaxParallelism().intValue());
        }
        options.setParallelism(parallelism);
        if (options.getObjectReuse().booleanValue()) {
            flinkStreamEnv.getConfig().enableObjectReuse();
        } else {
            flinkStreamEnv.getConfig().disableObjectReuse();
        }
        flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        int numRetries = options.getNumberOfExecutionRetries();
        if (numRetries != -1) {
            flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
        }
        if ((retryDelay = options.getExecutionRetryDelay().longValue()) != -1L) {
            flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
        }
        if ((checkpointInterval = options.getCheckpointingInterval().longValue()) != -1L) {
            long minPauseBetweenCheckpoints;
            if (checkpointInterval < 1L) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            flinkStreamEnv.enableCheckpointing(checkpointInterval, CheckpointingMode.valueOf((String)options.getCheckpointingMode()));
            if (options.getShutdownSourcesAfterIdleMs() == -1L) {
                options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE);
            }
            if (options.getCheckpointTimeoutMillis() != -1L) {
                flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(options.getCheckpointTimeoutMillis().longValue());
            }
            boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
            boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
            if (externalizedCheckpoint) {
                flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(retainOnCancellation ? CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION : CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
            }
            if ((minPauseBetweenCheckpoints = options.getMinPauseBetweenCheckpoints().longValue()) != -1L) {
                flinkStreamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);
            }
            boolean failOnCheckpointingErrors = options.getFailOnCheckpointingErrors();
            flinkStreamEnv.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
            flinkStreamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(options.getNumConcurrentCheckpoints());
        } else if (options.getShutdownSourcesAfterIdleMs() == -1L) {
            options.setShutdownSourcesAfterIdleMs(0L);
        }
        FlinkExecutionEnvironments.applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
        if (options.getAutoWatermarkInterval() != null) {
            flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval().longValue());
        }
        if (options.getStateBackendFactory() != null) {
            StateBackend stateBackend = ((FlinkStateBackendFactory)InstanceBuilder.ofType(FlinkStateBackendFactory.class).fromClass(options.getStateBackendFactory()).build()).createStateBackend(options);
            flinkStreamEnv.setStateBackend(stateBackend);
        }
        return flinkStreamEnv;
    }

    private void configureCheckpointingOptions() {
    }

    private static String stripHttpSchema(String url) {
        return url.trim().replaceFirst("^http[s]?://", "");
    }

    private static int determineParallelism(int pipelineOptionsParallelism, int envParallelism, Configuration configuration) {
        if (pipelineOptionsParallelism > 0) {
            return pipelineOptionsParallelism;
        }
        if (envParallelism > 0) {
            return envParallelism;
        }
        int flinkConfigParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
        if (flinkConfigParallelism > 0) {
            return flinkConfigParallelism;
        }
        LOG.warn("No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism");
        return 1;
    }

    private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) {
        return flinkConfDir == null ? GlobalConfiguration.loadConfiguration() : GlobalConfiguration.loadConfiguration((String)flinkConfDir);
    }

    private static void applyLatencyTrackingInterval(ExecutionConfig config, FlinkPipelineOptions options) {
        long latencyTrackingInterval = options.getLatencyTrackingInterval();
        config.setLatencyTrackingInterval(latencyTrackingInterval);
    }

    private static void setManagedMemoryByFraction(Configuration config) {
        if (!config.containsKey("taskmanager.memory.managed.size")) {
            float managedMemoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
            long freeHeapMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
            long managedMemorySize = (long)((float)freeHeapMemory * managedMemoryFraction);
            config.setString("taskmanager.memory.managed.size", String.valueOf(managedMemorySize));
        }
    }
}

