package org.apache.flink.streaming.api.datastream;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;

/* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStream.class */
public class DataStream<OUT> {
    protected static Integer counter = 0;
    protected final StreamExecutionEnvironment environment;
    protected final String id;
    protected int degreeOfParallelism;
    protected List<String> userDefinedNames;
    protected boolean selectAll;
    protected StreamPartitioner<OUT> partitioner;
    protected final TypeWrapper<OUT> outTypeWrapper;
    protected List<DataStream<OUT>> mergedStreams;
    protected final JobGraphBuilder jobGraphBuilder;

    /* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStream$ConnectionType.class */
    public enum ConnectionType {
        SHUFFLE,
        BROADCAST,
        FIELD,
        FORWARD,
        DISTRIBUTE
    }

    public DataStream(StreamExecutionEnvironment streamExecutionEnvironment, String str, TypeWrapper<OUT> typeWrapper) {
        if (streamExecutionEnvironment == null) {
            throw new NullPointerException("context is null");
        }
        Integer num = counter;
        counter = Integer.valueOf(counter.intValue() + 1);
        this.id = str + "-" + counter.toString();
        this.environment = streamExecutionEnvironment;
        this.degreeOfParallelism = streamExecutionEnvironment.getDegreeOfParallelism();
        this.jobGraphBuilder = streamExecutionEnvironment.getJobGraphBuilder();
        this.userDefinedNames = new ArrayList();
        this.selectAll = false;
        this.partitioner = new ForwardPartitioner();
        this.outTypeWrapper = typeWrapper;
        this.mergedStreams = new ArrayList();
        this.mergedStreams.add(this);
    }

    public DataStream(DataStream<OUT> dataStream) {
        this.environment = dataStream.environment;
        this.id = dataStream.id;
        this.degreeOfParallelism = dataStream.degreeOfParallelism;
        this.userDefinedNames = new ArrayList(dataStream.userDefinedNames);
        this.selectAll = dataStream.selectAll;
        this.partitioner = dataStream.partitioner;
        this.jobGraphBuilder = dataStream.jobGraphBuilder;
        this.outTypeWrapper = dataStream.outTypeWrapper;
        this.mergedStreams = new ArrayList();
        this.mergedStreams.add(this);
        if (dataStream.mergedStreams.size() > 1) {
            for (int i = 1; i < dataStream.mergedStreams.size(); i++) {
                this.mergedStreams.add(new DataStream<>(dataStream.mergedStreams.get(i)));
            }
        }
    }

    public String getId() {
        return this.id;
    }

    public int getParallelism() {
        return this.degreeOfParallelism;
    }

    public TypeInformation<OUT> getOutputType() {
        return this.outTypeWrapper.getTypeInfo();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Class<?> getClassAtPos(int i) {
        Class<?> typeClass;
        TupleTypeInfo typeInfo = this.outTypeWrapper.getTypeInfo();
        if (typeInfo.isTupleType()) {
            typeClass = typeInfo.getTypeAt(i).getTypeClass();
        } else {
            if (i != 0) {
                throw new IndexOutOfBoundsException("Position is out of range");
            }
            typeClass = typeInfo.getTypeClass();
        }
        return typeClass;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkFieldRange(int i) {
        try {
            getClassAtPos(i);
        } catch (IndexOutOfBoundsException e) {
            throw new RuntimeException("Selected field is out of range");
        }
    }

    public DataStream<OUT> merge(DataStream<OUT>... dataStreamArr) {
        DataStream<OUT> copy = copy();
        for (DataStream<OUT> dataStream : dataStreamArr) {
            for (DataStream<OUT> dataStream2 : dataStream.mergedStreams) {
                validateMerge(dataStream2.getId());
                copy.mergedStreams.add(dataStream2.copy());
            }
        }
        return copy;
    }

    private void validateMerge(String str) {
        Iterator<DataStream<OUT>> it = this.mergedStreams.iterator();
        while (it.hasNext()) {
            if (it.next().getId().equals(str)) {
                throw new RuntimeException("A DataStream cannot be merged with itself");
            }
        }
    }

    public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
        return new ConnectedDataStream<>(this, dataStream);
    }

    public DataStream<OUT> partitionBy(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("The position of the field must be non-negative");
        }
        return setConnectionType(new FieldsPartitioner(i));
    }

    public DataStream<OUT> broadcast() {
        return setConnectionType(new BroadcastPartitioner());
    }

    public DataStream<OUT> shuffle() {
        return setConnectionType(new ShufflePartitioner());
    }

    public DataStream<OUT> forward() {
        return setConnectionType(new ForwardPartitioner());
    }

    public DataStream<OUT> distribute() {
        return setConnectionType(new DistributePartitioner());
    }

    public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapFunction) {
        return addFunction("map", mapFunction, new FunctionTypeWrapper(mapFunction, MapFunction.class, 0), new FunctionTypeWrapper(mapFunction, MapFunction.class, 1), new MapInvokable(mapFunction));
    }

    public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapFunction) {
        return addFunction("flatMap", flatMapFunction, new FunctionTypeWrapper(flatMapFunction, FlatMapFunction.class, 0), new FunctionTypeWrapper(flatMapFunction, FlatMapFunction.class, 1), new FlatMapInvokable(flatMapFunction));
    }

    public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
        return (SingleOutputStreamOperator<OUT, ?>) addFunction("reduce", reduceFunction, new FunctionTypeWrapper(reduceFunction, ReduceFunction.class, 0), new FunctionTypeWrapper(reduceFunction, ReduceFunction.class, 0), new StreamReduceInvokable(reduceFunction));
    }

    public StreamProjection<OUT> project(int... iArr) {
        return new StreamProjection<>(copy(), iArr);
    }

    public GroupedDataStream<OUT> groupBy(int i) {
        return new GroupedDataStream<>(this, i);
    }

    public BatchedDataStream<OUT> batch(long j, long j2) {
        if (j < 1) {
            throw new IllegalArgumentException("Batch size must be positive");
        }
        if (j2 < 1) {
            throw new IllegalArgumentException("Slide size must be positive");
        }
        return new BatchedDataStream<>(this, j, j2);
    }

    public BatchedDataStream<OUT> batch(long j) {
        return batch(j, j);
    }

    public WindowDataStream<OUT> window(long j, long j2, TimeStamp<OUT> timeStamp) {
        if (j < 1) {
            throw new IllegalArgumentException("Window size must be positive");
        }
        if (j2 < 1) {
            throw new IllegalArgumentException("Slide interval must be positive");
        }
        return new WindowDataStream<>(this, j, j2, timeStamp);
    }

    public WindowDataStream<OUT> window(long j, long j2) {
        return window(j, j2, new DefaultTimeStamp());
    }

    public WindowDataStream<OUT> window(long j) {
        return window(j, j);
    }

    public SingleOutputStreamOperator<OUT, ?> sum(int i) {
        checkFieldRange(i);
        return aggregate(SumAggregationFunction.getSumFunction(i, getClassAtPos(i)));
    }

    public SingleOutputStreamOperator<OUT, ?> sum() {
        return sum(0);
    }

    public SingleOutputStreamOperator<OUT, ?> min(int i) {
        checkFieldRange(i);
        return aggregate(new MinAggregationFunction(i));
    }

    public SingleOutputStreamOperator<OUT, ?> minBy(int i) {
        return minBy(i, true);
    }

    public SingleOutputStreamOperator<OUT, ?> minBy(int i, boolean z) {
        checkFieldRange(i);
        return aggregate(new MinByAggregationFunction(i, z));
    }

    public SingleOutputStreamOperator<OUT, ?> min() {
        return min(0);
    }

    public SingleOutputStreamOperator<OUT, ?> max(int i) {
        checkFieldRange(i);
        return aggregate(new MaxAggregationFunction(i));
    }

    public SingleOutputStreamOperator<OUT, ?> maxBy(int i) {
        return maxBy(i, true);
    }

    public SingleOutputStreamOperator<OUT, ?> maxBy(int i, boolean z) {
        checkFieldRange(i);
        return aggregate(new MaxByAggregationFunction(i, z));
    }

    public SingleOutputStreamOperator<OUT, ?> max() {
        return max(0);
    }

    public SingleOutputStreamOperator<Long, ?> count() {
        return addFunction("counter", null, this.outTypeWrapper, new ObjectTypeWrapper(new Long(0L)), new CounterInvokable());
    }

    protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregationFunction) {
        return (SingleOutputStreamOperator<OUT, ?>) addFunction("reduce", aggregationFunction, this.outTypeWrapper, this.outTypeWrapper, new StreamReduceInvokable(aggregationFunction));
    }

    public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filterFunction) {
        FunctionTypeWrapper functionTypeWrapper = new FunctionTypeWrapper(filterFunction, FilterFunction.class, 0);
        return (SingleOutputStreamOperator<OUT, ?>) addFunction("filter", filterFunction, functionTypeWrapper, functionTypeWrapper, new FilterInvokable(filterFunction));
    }

    public DataStreamSink<OUT> print() {
        return addSink(copy(), new PrintSinkFunction(), this.outTypeWrapper);
    }

    public DataStreamSink<OUT> writeAsText(String str) {
        return writeAsText((DataStream<int>) this, str, (WriteFormatAsText<int>) new WriteFormatAsText<>(), 1, (int) null);
    }

    public DataStreamSink<OUT> writeAsText(String str, long j) {
        return writeAsText((DataStream<long>) this, str, (WriteFormatAsText<long>) new WriteFormatAsText<>(), j, (long) null);
    }

    public DataStreamSink<OUT> writeAsText(String str, int i) {
        return writeAsText((DataStream<int>) this, str, (WriteFormatAsText<int>) new WriteFormatAsText<>(), i, (int) null);
    }

    public DataStreamSink<OUT> writeAsText(String str, long j, OUT out) {
        return writeAsText((DataStream<long>) this, str, (WriteFormatAsText<long>) new WriteFormatAsText<>(), j, (long) out);
    }

    public DataStreamSink<OUT> writeAsText(String str, int i, OUT out) {
        return writeAsText((DataStream<int>) this, str, (WriteFormatAsText<int>) new WriteFormatAsText<>(), i, (int) out);
    }

    private DataStreamSink<OUT> writeAsText(DataStream<OUT> dataStream, String str, WriteFormatAsText<OUT> writeFormatAsText, long j, OUT out) {
        DataStreamSink<OUT> addSink = addSink(dataStream, new WriteSinkFunctionByMillis(str, writeFormatAsText, j, out), dataStream.outTypeWrapper);
        this.jobGraphBuilder.setMutability(addSink.getId(), false);
        return addSink;
    }

    private DataStreamSink<OUT> writeAsText(DataStream<OUT> dataStream, String str, WriteFormatAsText<OUT> writeFormatAsText, int i, OUT out) {
        DataStreamSink<OUT> addSink = addSink(dataStream, new WriteSinkFunctionByBatches(str, writeFormatAsText, i, out), dataStream.outTypeWrapper);
        this.jobGraphBuilder.setMutability(addSink.getId(), false);
        return addSink;
    }

    public DataStreamSink<OUT> writeAsCsv(String str) {
        return writeAsCsv((DataStream<int>) this, str, (WriteFormatAsCsv<int>) new WriteFormatAsCsv<>(), 1, (int) null);
    }

    public DataStreamSink<OUT> writeAsCsv(String str, long j) {
        return writeAsCsv((DataStream<long>) this, str, (WriteFormatAsCsv<long>) new WriteFormatAsCsv<>(), j, (long) null);
    }

    public DataStreamSink<OUT> writeAsCsv(String str, int i) {
        return writeAsCsv((DataStream<int>) this, str, (WriteFormatAsCsv<int>) new WriteFormatAsCsv<>(), i, (int) null);
    }

    public DataStreamSink<OUT> writeAsCsv(String str, long j, OUT out) {
        return writeAsCsv((DataStream<long>) this, str, (WriteFormatAsCsv<long>) new WriteFormatAsCsv<>(), j, (long) out);
    }

    public DataStreamSink<OUT> writeAsCsv(String str, int i, OUT out) {
        if (this instanceof SingleOutputStreamOperator) {
            ((SingleOutputStreamOperator) this).setMutability(false);
        }
        return writeAsCsv((DataStream<int>) this, str, (WriteFormatAsCsv<int>) new WriteFormatAsCsv<>(), i, (int) out);
    }

    private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> dataStream, String str, WriteFormatAsCsv<OUT> writeFormatAsCsv, long j, OUT out) {
        DataStreamSink<OUT> addSink = addSink(dataStream, new WriteSinkFunctionByMillis(str, writeFormatAsCsv, j, out), dataStream.outTypeWrapper);
        this.jobGraphBuilder.setMutability(addSink.getId(), false);
        return addSink;
    }

    private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> dataStream, String str, WriteFormatAsCsv<OUT> writeFormatAsCsv, int i, OUT out) {
        DataStreamSink<OUT> addSink = addSink(dataStream, new WriteSinkFunctionByBatches(str, writeFormatAsCsv, i, out), dataStream.outTypeWrapper);
        this.jobGraphBuilder.setMutability(addSink.getId(), false);
        return addSink;
    }

    public IterativeDataStream<OUT> iterate() {
        return new IterativeDataStream<>(this);
    }

    protected <R> DataStream<OUT> addIterationSource(String str, long j) {
        this.jobGraphBuilder.addIterationHead(new DataStreamSource(this.environment, "iterationSource", null).getId(), getId(), str, this.degreeOfParallelism, j);
        return copy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R> SingleOutputStreamOperator<R, ?> addFunction(String str, Function function, TypeWrapper<OUT> typeWrapper, TypeWrapper<R> typeWrapper2, StreamInvokable<OUT, R> streamInvokable) {
        DataStream<OUT> copy = copy();
        SingleOutputStreamOperator<R, ?> singleOutputStreamOperator = new SingleOutputStreamOperator<>(this.environment, str, typeWrapper2);
        try {
            this.jobGraphBuilder.addStreamVertex(singleOutputStreamOperator.getId(), streamInvokable, typeWrapper, typeWrapper2, str, SerializationUtils.serialize((Serializable) function), this.degreeOfParallelism);
            connectGraph(copy, singleOutputStreamOperator.getId(), 0);
            if (copy instanceof IterativeDataStream) {
                IterativeDataStream iterativeDataStream = (IterativeDataStream) copy;
                singleOutputStreamOperator.addIterationSource(iterativeDataStream.iterationID.toString(), iterativeDataStream.waitTime);
            }
            return singleOutputStreamOperator;
        } catch (SerializationException e) {
            throw new RuntimeException("Cannot serialize user defined function");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataStream<OUT> setConnectionType(StreamPartitioner<OUT> streamPartitioner) {
        DataStream<OUT> copy = copy();
        Iterator<DataStream<OUT>> it = copy.mergedStreams.iterator();
        while (it.hasNext()) {
            it.next().partitioner = streamPartitioner;
        }
        return copy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <X> void connectGraph(DataStream<X> dataStream, String str, int i) {
        for (DataStream<X> dataStream2 : dataStream.mergedStreams) {
            this.jobGraphBuilder.setEdge(dataStream2.getId(), str, dataStream2.partitioner, i, dataStream.userDefinedNames, dataStream.selectAll);
        }
    }

    public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
        return addSink(copy(), sinkFunction);
    }

    private DataStreamSink<OUT> addSink(DataStream<OUT> dataStream, SinkFunction<OUT> sinkFunction) {
        return addSink(dataStream, sinkFunction, new FunctionTypeWrapper(sinkFunction, SinkFunction.class, 0));
    }

    private DataStreamSink<OUT> addSink(DataStream<OUT> dataStream, SinkFunction<OUT> sinkFunction, TypeWrapper<OUT> typeWrapper) {
        DataStreamSink<OUT> dataStreamSink = new DataStreamSink<>(this.environment, "sink", this.outTypeWrapper);
        try {
            this.jobGraphBuilder.addStreamVertex(dataStreamSink.getId(), new SinkInvokable(sinkFunction), typeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction), this.degreeOfParallelism);
            dataStream.connectGraph(dataStream.copy(), dataStreamSink.getId(), 0);
            return dataStreamSink;
        } catch (SerializationException e) {
            throw new RuntimeException("Cannot serialize SinkFunction");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataStream<OUT> copy() {
        return new DataStream<>(this);
    }
}
