package org.apache.beam.runners.samza.translation;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.container.BeamContainerRunner;
import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigLoaderFactory;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.yarn.YarnJobFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/translation/ConfigBuilder.class */
public class ConfigBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(ConfigBuilder.class);
    private static final String BEAM_STORE_FACTORY = "stores.beamStore.factory";
    private static final String APP_RUNNER_CLASS = "app.runner.class";
    private static final String YARN_PACKAGE_PATH = "yarn.package.path";
    private static final String JOB_FACTORY_CLASS = "job.factory.class";
    private final Map<String, String> config = new HashMap();
    private final SamzaPipelineOptions options;

    public ConfigBuilder(SamzaPipelineOptions samzaPipelineOptions) {
        this.options = samzaPipelineOptions;
    }

    public void put(String str, String str2) {
        this.config.put(str, str2);
    }

    public void putAll(Map<String, String> map) {
        this.config.putAll(map);
    }

    public Config build() {
        try {
            this.config.putAll(createSystemConfig(this.options, this.config));
            this.config.putAll(createUserConfig(this.options));
            this.config.put("app.name", this.options.getJobName());
            this.config.put("app.id", this.options.getJobInstance());
            this.config.put("job.name", this.options.getJobName());
            this.config.put("job.id", this.options.getJobInstance());
            this.config.put("task.max.concurrency", String.valueOf(this.options.getMaxBundleSize()));
            this.options.setConfigOverride(new HashMap());
            this.config.put("beamPipelineOptions", Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(this.options)));
            validateConfigs(this.options, this.config);
            return new MapConfig(this.config);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, String> createUserConfig(SamzaPipelineOptions samzaPipelineOptions) throws Exception {
        HashMap hashMap = new HashMap();
        String configFilePath = samzaPipelineOptions.getConfigFilePath();
        if (StringUtils.isNoneEmpty(new CharSequence[]{configFilePath})) {
            LOG.info("configFilePath: " + configFilePath);
            MapConfig mapConfig = new MapConfig(Collections.singletonMap("path", configFilePath));
            ConfigLoaderFactory newInstance = samzaPipelineOptions.getConfigLoaderFactory().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            LOG.info("configLoaderFactory: " + newInstance.getClass().getName());
            if (newInstance instanceof PropertiesConfigLoaderFactory) {
                Preconditions.checkArgument(new File(configFilePath).exists(), "Config file %s does not exist", configFilePath);
            }
            hashMap.putAll(newInstance.getLoader(mapConfig).getConfig());
        }
        if (samzaPipelineOptions.getConfigOverride() != null) {
            hashMap.putAll(samzaPipelineOptions.getConfigOverride());
        }
        return hashMap;
    }

    private static void validateZKStandAloneRun(Map<String, String> map) {
        Preconditions.checkArgument(map.containsKey(APP_RUNNER_CLASS), "Config %s not found for %s Deployment", APP_RUNNER_CLASS, SamzaExecutionEnvironment.STANDALONE);
        Preconditions.checkArgument(map.get(APP_RUNNER_CLASS).equals(LocalApplicationRunner.class.getName()), "Config %s must be set to %s for %s Deployment", APP_RUNNER_CLASS, LocalApplicationRunner.class.getName(), SamzaExecutionEnvironment.STANDALONE);
        Preconditions.checkArgument(map.containsKey("job.coordinator.factory"), "Config %s not found for %s Deployment", "job.coordinator.factory", SamzaExecutionEnvironment.STANDALONE);
        Preconditions.checkArgument(map.get("job.coordinator.factory").equals(ZkJobCoordinatorFactory.class.getName()), "Config %s must be set to %s for %s Deployment", "job.coordinator.factory", ZkJobCoordinatorFactory.class.getName(), SamzaExecutionEnvironment.STANDALONE);
        Preconditions.checkArgument(map.containsKey("job.coordinator.zk.connect"), "Config %s not found for %s Deployment", "job.coordinator.zk.connect", SamzaExecutionEnvironment.STANDALONE);
    }

    private static void validateYarnRun(Map<String, String> map) {
        Preconditions.checkArgument(map.containsKey(YARN_PACKAGE_PATH), "Config %s not found for %s Deployment", YARN_PACKAGE_PATH, SamzaExecutionEnvironment.YARN);
        String str = map.get(APP_RUNNER_CLASS);
        Preconditions.checkArgument(str == null || BeamJobCoordinatorRunner.class.getName().equals(str) || RemoteApplicationRunner.class.getName().equals(str) || BeamContainerRunner.class.getName().equals(str), "Config %s must be set to %s for %s Deployment, but found %s", APP_RUNNER_CLASS, String.format("[%s, %s or %s]", BeamJobCoordinatorRunner.class.getName(), RemoteApplicationRunner.class.getName(), BeamContainerRunner.class.getName()), SamzaExecutionEnvironment.YARN, str);
        Preconditions.checkArgument(map.containsKey(JOB_FACTORY_CLASS), "Config %s not found for %s Deployment", JOB_FACTORY_CLASS, SamzaExecutionEnvironment.YARN);
    }

    @VisibleForTesting
    public static Map<String, String> localRunConfig() {
        return ImmutableMap.builder().put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName()).put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName()).put("task.name.grouper.factory", SingleContainerGrouperFactory.class.getName()).put("task.commit.ms", "-1").put("processor.id", "1").put("app.run.id", System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8)).build();
    }

    public static Map<String, String> yarnRunConfig() {
        return ImmutableMap.builder().put(APP_RUNNER_CLASS, RemoteApplicationRunner.class.getName()).put(JOB_FACTORY_CLASS, YarnJobFactory.class.getName()).build();
    }

    public static Map<String, String> standAloneRunConfig() {
        return ImmutableMap.builder().put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName()).put("job.coordinator.factory", ZkJobCoordinatorFactory.class.getName()).build();
    }

    private static Map<String, String> createSystemConfig(SamzaPipelineOptions samzaPipelineOptions, Map<String, String> map) {
        ImmutableMap.Builder put = ImmutableMap.builder().put("stores.beamStore.key.serde", "byteArraySerde").put("stores.beamStore.msg.serde", "stateValueSerde").put("serializers.registry.stateValueSerde.class", SamzaStoreStateInternals.StateValueSerdeFactory.class.getName()).put("serializers.registry.byteArraySerde.class", SamzaStoreStateInternals.ByteArraySerdeFactory.class.getName());
        if (!map.containsKey(BEAM_STORE_FACTORY)) {
            samzaPipelineOptions.setStateDurable(false);
            put.put(BEAM_STORE_FACTORY, "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
        }
        LOG.info("Execution environment is " + samzaPipelineOptions.getSamzaExecutionEnvironment());
        switch (samzaPipelineOptions.getSamzaExecutionEnvironment()) {
            case YARN:
                put.putAll(yarnRunConfig());
                break;
            case STANDALONE:
                put.putAll(standAloneRunConfig());
                break;
            default:
                put.putAll(localRunConfig());
                break;
        }
        put.put("samza.li.task.wrapper.enabled", "false");
        return put.build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<String, String> createRocksDBStoreConfig(SamzaPipelineOptions samzaPipelineOptions) {
        ImmutableMap.Builder put = ImmutableMap.builder().put(BEAM_STORE_FACTORY, "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory").put("stores.beamStore.rocksdb.compression", "lz4");
        if (samzaPipelineOptions.getStateDurable().booleanValue()) {
            LOG.info("stateDurable is enabled");
            put.put("stores.beamStore.changelog", getChangelogTopic(samzaPipelineOptions, "beamStore"));
            put.put("job.host-affinity.enabled", "true");
        }
        return put.build();
    }

    private static void validateConfigs(SamzaPipelineOptions samzaPipelineOptions, Map<String, String> map) {
        switch (samzaPipelineOptions.getSamzaExecutionEnvironment()) {
            case YARN:
                validateYarnRun(map);
                return;
            case STANDALONE:
                validateZKStandAloneRun(map);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String getChangelogTopic(SamzaPipelineOptions samzaPipelineOptions, String str) {
        return String.format("%s-%s-%s-changelog", samzaPipelineOptions.getJobName(), samzaPipelineOptions.getJobInstance(), str);
    }
}
