package org.apache.beam.runners.flink;

import java.net.URL;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironments.class */
public class FlinkExecutionEnvironments {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);

    public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list) {
        return createBatchExecutionEnvironment(flinkPipelineOptions, list, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list, @Nullable String str) {
        LocalEnvironment createRemoteEnvironment;
        LOG.info("Creating a Batch Execution Environment.");
        String stripHttpSchema = stripHttpSchema(flinkPipelineOptions.getFlinkMaster());
        Configuration flinkConfiguration = getFlinkConfiguration(str);
        if ("[local]".equals(stripHttpSchema)) {
            setManagedMemoryByFraction(flinkConfiguration);
            createRemoteEnvironment = ExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
        } else if ("[collection]".equals(stripHttpSchema)) {
            createRemoteEnvironment = new CollectionEnvironment();
        } else if (FlinkPipelineOptions.AUTO.equals(stripHttpSchema)) {
            createRemoteEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        } else {
            HostAndPort withDefaultPort = HostAndPort.fromString(stripHttpSchema).withDefaultPort(flinkConfiguration.getInteger(RestOptions.PORT));
            flinkConfiguration.setInteger(RestOptions.PORT, withDefaultPort.getPort());
            createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment(withDefaultPort.getHost(), withDefaultPort.getPort(), flinkConfiguration, (String[]) list.toArray(new String[list.size()]));
            LOG.info("Using Flink Master URL {}:{}.", withDefaultPort.getHost(), Integer.valueOf(withDefaultPort.getPort()));
        }
        createRemoteEnvironment.getConfig().setExecutionMode(ExecutionMode.valueOf(flinkPipelineOptions.getExecutionModeForBatch()));
        if (flinkPipelineOptions.getParallelism().intValue() != -1 && !(createRemoteEnvironment instanceof CollectionEnvironment)) {
            createRemoteEnvironment.setParallelism(flinkPipelineOptions.getParallelism().intValue());
        }
        int determineParallelism = createRemoteEnvironment instanceof CollectionEnvironment ? 1 : determineParallelism(flinkPipelineOptions.getParallelism().intValue(), createRemoteEnvironment.getParallelism(), flinkConfiguration);
        createRemoteEnvironment.setParallelism(determineParallelism);
        flinkPipelineOptions.setParallelism(Integer.valueOf(determineParallelism));
        if (flinkPipelineOptions.getObjectReuse().booleanValue()) {
            createRemoteEnvironment.getConfig().enableObjectReuse();
        } else {
            createRemoteEnvironment.getConfig().disableObjectReuse();
        }
        applyLatencyTrackingInterval(createRemoteEnvironment.getConfig(), flinkPipelineOptions);
        return createRemoteEnvironment;
    }

    public static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list) {
        return createStreamExecutionEnvironment(flinkPipelineOptions, list, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list, @Nullable String str) {
        LocalStreamEnvironment remoteStreamEnvironment;
        LOG.info("Creating a Streaming Environment.");
        String stripHttpSchema = stripHttpSchema(flinkPipelineOptions.getFlinkMaster());
        Configuration flinkConfiguration = getFlinkConfiguration(str);
        if ("[local]".equals(stripHttpSchema)) {
            setManagedMemoryByFraction(flinkConfiguration);
            remoteStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.getDefaultLocalParallelism(), flinkConfiguration);
        } else if (FlinkPipelineOptions.AUTO.equals(stripHttpSchema)) {
            remoteStreamEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        } else {
            HostAndPort withDefaultPort = HostAndPort.fromString(stripHttpSchema).withDefaultPort(flinkConfiguration.getInteger(RestOptions.PORT));
            flinkConfiguration.setInteger(RestOptions.PORT, withDefaultPort.getPort());
            remoteStreamEnvironment = new RemoteStreamEnvironment(withDefaultPort.getHost(), withDefaultPort.getPort(), flinkConfiguration, (String[]) list.toArray(new String[list.size()]), (URL[]) null, flinkPipelineOptions.getSavepointPath() != null ? SavepointRestoreSettings.forPath(flinkPipelineOptions.getSavepointPath(), flinkPipelineOptions.getAllowNonRestoredState().booleanValue()) : SavepointRestoreSettings.none());
            LOG.info("Using Flink Master URL {}:{}.", withDefaultPort.getHost(), Integer.valueOf(withDefaultPort.getPort()));
        }
        int determineParallelism = determineParallelism(flinkPipelineOptions.getParallelism().intValue(), remoteStreamEnvironment.getParallelism(), flinkConfiguration);
        remoteStreamEnvironment.setParallelism(determineParallelism);
        if (flinkPipelineOptions.getMaxParallelism().intValue() > 0) {
            remoteStreamEnvironment.setMaxParallelism(flinkPipelineOptions.getMaxParallelism().intValue());
        }
        flinkPipelineOptions.setParallelism(Integer.valueOf(determineParallelism));
        if (flinkPipelineOptions.getObjectReuse().booleanValue()) {
            remoteStreamEnvironment.getConfig().enableObjectReuse();
        } else {
            remoteStreamEnvironment.getConfig().disableObjectReuse();
        }
        remoteStreamEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        int intValue = flinkPipelineOptions.getNumberOfExecutionRetries().intValue();
        if (intValue != -1) {
            remoteStreamEnvironment.setNumberOfExecutionRetries(intValue);
        }
        long longValue = flinkPipelineOptions.getExecutionRetryDelay().longValue();
        if (longValue != -1) {
            remoteStreamEnvironment.getConfig().setExecutionRetryDelay(longValue);
        }
        long longValue2 = flinkPipelineOptions.getCheckpointingInterval().longValue();
        if (longValue2 != -1) {
            if (longValue2 < 1) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            remoteStreamEnvironment.enableCheckpointing(longValue2, CheckpointingMode.valueOf(flinkPipelineOptions.getCheckpointingMode()));
            if (flinkPipelineOptions.getCheckpointTimeoutMillis().longValue() != -1) {
                remoteStreamEnvironment.getCheckpointConfig().setCheckpointTimeout(flinkPipelineOptions.getCheckpointTimeoutMillis().longValue());
            }
            boolean booleanValue = flinkPipelineOptions.isExternalizedCheckpointsEnabled().booleanValue();
            boolean booleanValue2 = flinkPipelineOptions.getRetainExternalizedCheckpointsOnCancellation().booleanValue();
            if (booleanValue) {
                remoteStreamEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(booleanValue2 ? CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION : CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
            }
            long longValue3 = flinkPipelineOptions.getMinPauseBetweenCheckpoints().longValue();
            if (longValue3 != -1) {
                remoteStreamEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(longValue3);
            }
            remoteStreamEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(flinkPipelineOptions.getFailOnCheckpointingErrors().booleanValue());
            remoteStreamEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(flinkPipelineOptions.getNumConcurrentCheckpoints());
        }
        applyLatencyTrackingInterval(remoteStreamEnvironment.getConfig(), flinkPipelineOptions);
        if (flinkPipelineOptions.getAutoWatermarkInterval() != null) {
            remoteStreamEnvironment.getConfig().setAutoWatermarkInterval(flinkPipelineOptions.getAutoWatermarkInterval().longValue());
        }
        if (flinkPipelineOptions.getStateBackendFactory() != null) {
            remoteStreamEnvironment.setStateBackend(((FlinkStateBackendFactory) InstanceBuilder.ofType(FlinkStateBackendFactory.class).fromClass(flinkPipelineOptions.getStateBackendFactory()).build()).createStateBackend(flinkPipelineOptions));
        }
        return remoteStreamEnvironment;
    }

    private static String stripHttpSchema(String str) {
        return str.trim().replaceFirst("^http[s]?://", "");
    }

    private static int determineParallelism(int i, int i2, Configuration configuration) {
        if (i > 0) {
            return i;
        }
        if (i2 > 0) {
            return i2;
        }
        int integer = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
        if (integer > 0) {
            return integer;
        }
        LOG.warn("No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism");
        return 1;
    }

    private static Configuration getFlinkConfiguration(@Nullable String str) {
        return str == null ? GlobalConfiguration.loadConfiguration() : GlobalConfiguration.loadConfiguration(str);
    }

    private static void applyLatencyTrackingInterval(ExecutionConfig executionConfig, FlinkPipelineOptions flinkPipelineOptions) {
        executionConfig.setLatencyTrackingInterval(flinkPipelineOptions.getLatencyTrackingInterval().longValue());
    }

    private static void setManagedMemoryByFraction(Configuration configuration) {
        if (configuration.containsKey("taskmanager.memory.managed.size")) {
            return;
        }
        configuration.setString("taskmanager.memory.managed.size", String.valueOf(((float) EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()) * configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION)));
    }
}
