package org.apache.flink.statefun.flink.harness;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.spi.Modules;
import org.apache.flink.statefun.flink.harness.io.ConsumingEgressSpec;
import org.apache.flink.statefun.flink.harness.io.SerializableConsumer;
import org.apache.flink.statefun.flink.harness.io.SerializableSupplier;
import org.apache.flink.statefun.flink.harness.io.SupplyingIngressSpec;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/harness/Harness.class */
public class Harness {
    private final Map<String, String> globalConfigurations = new HashMap();
    private final Map<IngressIdentifier<?>, IngressSpec<?>> overrideIngress = new HashMap();
    private final Map<EgressIdentifier<?>, EgressSpec<?>> overrideEgress = new HashMap();
    private final Configuration flinkConfig = new Configuration();

    /* loaded from: input_file:org/apache/flink/statefun/flink/harness/Harness$HarnessProvider.class */
    private static final class HarnessProvider implements StatefulFunctionsUniverseProvider {
        private static final long serialVersionUID = 1;
        private final Map<IngressIdentifier<?>, IngressSpec<?>> ingressToReplace;
        private final Map<EgressIdentifier<?>, EgressSpec<?>> egressToReplace;

        HarnessProvider(Map<IngressIdentifier<?>, IngressSpec<?>> map, Map<EgressIdentifier<?>, EgressSpec<?>> map2) {
            this.ingressToReplace = map;
            this.egressToReplace = map2;
        }

        public StatefulFunctionsUniverse get(ClassLoader classLoader, StatefulFunctionsConfig statefulFunctionsConfig) {
            StatefulFunctionsUniverse createStatefulFunctionsUniverse = Modules.loadFromClassPath().createStatefulFunctionsUniverse(statefulFunctionsConfig);
            this.ingressToReplace.forEach((ingressIdentifier, ingressSpec) -> {
            });
            this.egressToReplace.forEach((egressIdentifier, egressSpec) -> {
            });
            return createStatefulFunctionsUniverse;
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/harness/Harness$PrintingConsumer.class */
    private static final class PrintingConsumer<T> implements SerializableConsumer<T> {
        private static final long serialVersionUID = 1;

        private PrintingConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            System.out.println(t);
        }
    }

    public <T> Harness withSupplyingIngress(IngressIdentifier<T> ingressIdentifier, SerializableSupplier<T> serializableSupplier) {
        Objects.requireNonNull(ingressIdentifier);
        Objects.requireNonNull(serializableSupplier);
        this.overrideIngress.put(ingressIdentifier, new SupplyingIngressSpec(ingressIdentifier, serializableSupplier, 0L));
        return this;
    }

    public <T> Harness withFlinkSourceFunction(IngressIdentifier<T> ingressIdentifier, SourceFunction<T> sourceFunction) {
        Objects.requireNonNull(ingressIdentifier);
        Objects.requireNonNull(sourceFunction);
        this.overrideIngress.put(ingressIdentifier, new SourceFunctionSpec(ingressIdentifier, sourceFunction));
        return this;
    }

    public <T> Harness withConsumingEgress(EgressIdentifier<T> egressIdentifier, SerializableConsumer<T> serializableConsumer) {
        Objects.requireNonNull(egressIdentifier);
        Objects.requireNonNull(serializableConsumer);
        this.overrideEgress.put(egressIdentifier, new ConsumingEgressSpec(egressIdentifier, serializableConsumer));
        return this;
    }

    public <T> Harness withPrintingEgress(EgressIdentifier<T> egressIdentifier) {
        return withConsumingEgress(egressIdentifier, new PrintingConsumer());
    }

    public Harness withKryoMessageSerializer() {
        this.flinkConfig.set(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
        return this;
    }

    public Harness withFlinkJobName(String str) {
        this.flinkConfig.set(StatefulFunctionsConfig.FLINK_JOB_NAME, str);
        return this;
    }

    public Harness withConfiguration(String str, String str2) {
        this.flinkConfig.setString(str, str2);
        return this;
    }

    public Harness withParallelism(int i) {
        this.flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, i);
        return this;
    }

    public Harness withGlobalConfiguration(String str, String str2) {
        this.globalConfigurations.put(str, str2);
        return this;
    }

    public Harness withSavepointLocation(String str) {
        Objects.requireNonNull(str);
        this.flinkConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, str);
        return this;
    }

    public void start() throws Exception {
        configureStrictlyRequiredFlinkConfigs(this.flinkConfig);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(getParallelism(this.flinkConfig), this.flinkConfig);
        createLocalEnvironment.configure(this.flinkConfig, Thread.currentThread().getContextClassLoader());
        StatefulFunctionsConfig fromFlinkConfiguration = StatefulFunctionsConfig.fromFlinkConfiguration(this.flinkConfig);
        fromFlinkConfiguration.addAllGlobalConfigurations(this.globalConfigurations);
        fromFlinkConfiguration.setProvider(new HarnessProvider(this.overrideIngress, this.overrideEgress));
        StatefulFunctionsJob.main(createLocalEnvironment, fromFlinkConfiguration);
    }

    private static int getParallelism(Configuration configuration) {
        return configuration.contains(CoreOptions.DEFAULT_PARALLELISM) ? configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM) : Runtime.getRuntime().availableProcessors();
    }

    private static void configureStrictlyRequiredFlinkConfigs(Configuration configuration) {
        configuration.set(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, String.join(";", StatefulFunctionsConfigValidator.PARENT_FIRST_CLASSLOADER_PATTERNS));
        configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
    }
}
