package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Serializer;
import java.io.File;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextValueInputFormat;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;

/* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.class */
public abstract class StreamExecutionEnvironment {
    public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
    private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
    protected static StreamExecutionEnvironment currentEnvironment;
    private long bufferTimeout = 100;
    private ExecutionConfig config = new ExecutionConfig();
    protected StreamGraph streamGraph = new StreamGraph(this);

    public ExecutionConfig getConfig() {
        return this.config;
    }

    @Deprecated
    public StreamExecutionEnvironment setDegreeOfParallelism(int i) {
        return setParallelism(i);
    }

    @Deprecated
    public int getDegreeOfParallelism() {
        return getParallelism();
    }

    public StreamExecutionEnvironment setParallelism(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("parallelism must be at least one.");
        }
        this.config.setParallelism(i);
        return this;
    }

    public int getParallelism() {
        return this.config.getParallelism();
    }

    public StreamExecutionEnvironment setBufferTimeout(long j) {
        if (j < -1) {
            throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
        }
        this.bufferTimeout = j;
        return this;
    }

    public long getBufferTimeout() {
        return this.bufferTimeout;
    }

    public StreamExecutionEnvironment disableOperatorChaining() {
        this.streamGraph.setChaining(false);
        return this;
    }

    public StreamExecutionEnvironment enableCheckpointing(long j) {
        this.streamGraph.setCheckpointingEnabled(true);
        this.streamGraph.setCheckpointingInterval(j);
        return this;
    }

    @Deprecated
    public StreamExecutionEnvironment enableCheckpointing(long j, boolean z) {
        this.streamGraph.setCheckpointingEnabled(true);
        this.streamGraph.setCheckpointingInterval(j);
        if (z) {
            this.streamGraph.forceCheckpoint();
        }
        return this;
    }

    public StreamExecutionEnvironment enableCheckpointing() {
        this.streamGraph.setCheckpointingEnabled(true);
        return this;
    }

    public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> stateHandleProvider) {
        this.streamGraph.setStateHandleProvider(stateHandleProvider);
        return this;
    }

    public void setNumberOfExecutionRetries(int i) {
        this.config.setNumberOfExecutionRetries(i);
    }

    public int getNumberOfExecutionRetries() {
        return this.config.getNumberOfExecutionRetries();
    }

    public static void setDefaultLocalParallelism(int i) {
        defaultLocalParallelism = i;
    }

    public void addDefaultKryoSerializer(Class<?> cls, Serializer<?> serializer) {
        this.config.addDefaultKryoSerializer(cls, serializer);
    }

    public void addDefaultKryoSerializer(Class<?> cls, Class<? extends Serializer<?>> cls2) {
        this.config.addDefaultKryoSerializer(cls, cls2);
    }

    public void registerTypeWithKryoSerializer(Class<?> cls, Serializer<?> serializer) {
        this.config.registerTypeWithKryoSerializer(cls, serializer);
    }

    public void registerTypeWithKryoSerializer(Class<?> cls, Class<? extends Serializer<?>> cls2) {
        this.config.registerTypeWithKryoSerializer(cls, cls2);
    }

    public void registerType(Class<?> cls) {
        if (cls == null) {
            throw new NullPointerException("Cannot register null type class.");
        }
        if (TypeExtractor.createTypeInfo(cls) instanceof PojoTypeInfo) {
            this.config.registerPojoType(cls);
        } else {
            this.config.registerKryoType(cls);
        }
    }

    public DataStreamSource<Long> generateSequence(long j, long j2) {
        if (j > j2) {
            throw new IllegalArgumentException("Start of sequence must not be greater than the end");
        }
        return fromCollection(new NumberSequenceIterator(j, j2), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
    }

    public DataStreamSource<Long> generateParallelSequence(long j, long j2) {
        return fromParallelCollection(new NumberSequenceIterator(j, j2), BasicTypeInfo.LONG_TYPE_INFO, "Parallel Sequence Source");
    }

    public <OUT> DataStreamSource<OUT> fromElements(OUT... outArr) {
        if (outArr.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }
        return addSource(new FromElementsFunction(outArr), "Elements source").returns(TypeExtractor.getForObject(outArr[0]));
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> collection) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        TypeInformation<OUT> forObject = TypeExtractor.getForObject(collection.iterator().next());
        FromElementsFunction fromElementsFunction = new FromElementsFunction((Collection) collection);
        checkCollection(collection, forObject.getTypeClass());
        return addSource(fromElementsFunction, "Collection Source").returns(forObject);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> collection, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        FromElementsFunction fromElementsFunction = new FromElementsFunction((Collection) collection);
        checkCollection(collection, typeInformation.getTypeClass());
        return addSource(fromElementsFunction, "Collection Source").returns(typeInformation);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> it, Class<OUT> cls) {
        return fromCollection(it, TypeExtractor.getForClass(cls));
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> it, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(it, "The iterator must not be null");
        return addSource(new FromIteratorFunction(it), "Collection Source").returns(typeInformation);
    }

    private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> it, TypeInformation<OUT> typeInformation, String str) {
        return addSource(new FromIteratorFunction(it), str).returns(typeInformation);
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, Class<OUT> cls) {
        return fromParallelCollection(splittableIterator, TypeExtractor.getForClass(cls));
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, TypeInformation<OUT> typeInformation) {
        return fromParallelCollection(splittableIterator, typeInformation, "Parallel Collection Source");
    }

    private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, TypeInformation<OUT> typeInformation, String str) {
        return addSource(new FromSplittableIteratorFunction(splittableIterator), str).returns(typeInformation);
    }

    public DataStreamSource<String> readTextFile(String str) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return createInput(new TextInputFormat(new Path(str)), BasicTypeInfo.STRING_TYPE_INFO, "Read Text File Source");
    }

    public DataStreamSource<String> readTextFile(String str, String str2) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        TextInputFormat textInputFormat = new TextInputFormat(new Path(str));
        BasicTypeInfo<String> basicTypeInfo = BasicTypeInfo.STRING_TYPE_INFO;
        textInputFormat.setCharsetName(str2);
        return createInput(textInputFormat, basicTypeInfo, "Read Text File Source");
    }

    public DataStreamSource<StringValue> readTextFileWithValue(String str) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return createInput(new TextValueInputFormat(new Path(str)), new ValueTypeInfo(StringValue.class), "Read Text File with Value source");
    }

    public DataStreamSource<StringValue> readTextFileWithValue(String str, String str2, boolean z) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        TextValueInputFormat textValueInputFormat = new TextValueInputFormat(new Path(str));
        ValueTypeInfo valueTypeInfo = new ValueTypeInfo(StringValue.class);
        textValueInputFormat.setCharsetName(str2);
        textValueInputFormat.setSkipInvalidLines(z);
        return createInput(textValueInputFormat, valueTypeInfo, "Read Text File with Value source");
    }

    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> fileInputFormat, String str) {
        Preconditions.checkNotNull(fileInputFormat, "InputFormat must not be null.");
        Preconditions.checkNotNull(str, "The file path must not be null.");
        fileInputFormat.setFilePath(new Path(str));
        try {
            return createInput(fileInputFormat, TypeExtractor.getInputFormatTypes(fileInputFormat), "Read File source");
        } catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
    }

    public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String str, Class<OUT> cls) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return createInput(new PrimitiveInputFormat(new Path(str), cls), TypeExtractor.getForClass(cls), "Read File of Primitives source");
    }

    public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String str, String str2, Class<OUT> cls) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return createInput(new PrimitiveInputFormat(new Path(str), str2, cls), TypeExtractor.getForClass(cls), "Read File of Primitives source");
    }

    public DataStream<String> readFileStream(String str, long j, FileMonitoringFunction.WatchType watchType) {
        return addSource(new FileMonitoringFunction(str, j, watchType), "Read File Stream source").flatMap(new FileReadFunction());
    }

    public DataStreamSource<String> socketTextStream(String str, int i, char c, long j) {
        return addSource(new SocketTextStreamFunction(str, i, c, j), "Socket Stream");
    }

    public DataStreamSource<String> socketTextStream(String str, int i, char c) {
        return socketTextStream(str, i, c, 0L);
    }

    public DataStreamSource<String> socketTextStream(String str, int i) {
        return socketTextStream(str, i, '\n');
    }

    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
        return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
    }

    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation) {
        return createInput(inputFormat, typeInformation, "Custom File source");
    }

    private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation, String str) {
        DataStreamSource<OUT> returns = addSource(new FileSourceFunction(inputFormat, typeInformation), str).returns(typeInformation);
        this.streamGraph.setInputFormat(returns.getId(), inputFormat);
        return returns;
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
        return addSource(sourceFunction, "Custom Source");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.flink.api.common.typeinfo.TypeInformation] */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.api.common.typeinfo.TypeInformation] */
    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction, String str) {
        MissingTypeInfo missingTypeInfo;
        if (sourceFunction instanceof ResultTypeQueryable) {
            missingTypeInfo = ((ResultTypeQueryable) sourceFunction).getProducedType();
        } else {
            try {
                missingTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, sourceFunction.getClass(), 0, null, null);
            } catch (InvalidTypesException e) {
                missingTypeInfo = new MissingTypeInfo("Custom source", e);
            }
        }
        clean(sourceFunction);
        return new DataStreamSource<>(this, str, missingTypeInfo, new StreamSource(sourceFunction), sourceFunction instanceof ParallelSourceFunction, str);
    }

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        if (currentEnvironment != null) {
            return currentEnvironment;
        }
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        if (executionEnvironment instanceof ContextEnvironment) {
            ContextEnvironment contextEnvironment = (ContextEnvironment) executionEnvironment;
            currentEnvironment = createContextEnvironment(contextEnvironment.getClient(), contextEnvironment.getJars(), contextEnvironment.getParallelism(), contextEnvironment.isWait());
        } else {
            if (!(executionEnvironment instanceof Client.OptimizerPlanEnvironment) && !(executionEnvironment instanceof PackagedProgram.PreviewPlanEnvironment)) {
                return createLocalEnvironment();
            }
            currentEnvironment = new StreamPlanEnvironment(executionEnvironment);
        }
        return currentEnvironment;
    }

    private static StreamExecutionEnvironment createContextEnvironment(Client client, List<File> list, int i, boolean z) {
        return new StreamContextEnvironment(client, list, i, z);
    }

    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    public static LocalStreamEnvironment createLocalEnvironment(int i) {
        currentEnvironment = new LocalStreamEnvironment();
        currentEnvironment.setParallelism(i);
        return (LocalStreamEnvironment) currentEnvironment;
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String str, int i, String... strArr) {
        currentEnvironment = new RemoteStreamEnvironment(str, i, strArr);
        return currentEnvironment;
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String str, int i, int i2, String... strArr) {
        currentEnvironment = new RemoteStreamEnvironment(str, i, strArr);
        currentEnvironment.setParallelism(i2);
        return currentEnvironment;
    }

    public abstract JobExecutionResult execute() throws Exception;

    public abstract JobExecutionResult execute(String str) throws Exception;

    public StreamGraph getStreamGraph() {
        return this.streamGraph;
    }

    public String getExecutionPlan() {
        return getStreamGraph().getStreamingPlanAsJSON();
    }

    protected static void initializeFromFactory(StreamExecutionEnvironmentFactory streamExecutionEnvironmentFactory) {
        currentEnvironment = streamExecutionEnvironmentFactory.createExecutionEnvironment();
    }

    private static <OUT> void checkCollection(Collection<OUT> collection, Class<OUT> cls) {
        Preconditions.checkNotNull(cls);
        for (OUT out : collection) {
            Preconditions.checkNotNull(out, "The collection must not contain null elements.");
            if (!cls.isAssignableFrom(out.getClass())) {
                throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + cls.getCanonicalName());
            }
        }
    }

    public <F> F clean(F f) {
        if (getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, true);
        }
        ClosureCleaner.ensureSerializable(f);
        return f;
    }
}
