package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
import org.apache.flink.streaming.api.operators.windowing.WindowFolder;
import org.apache.flink.streaming.api.operators.windowing.WindowMapper;
import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.windowing.WindowUtils;

/* loaded from: input_file:org/apache/flink/streaming/api/datastream/DiscretizedStream.class */
public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
    private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
    private WindowUtils.WindowTransformation transformation;
    protected boolean isPartitioned;

    /* JADX INFO: Access modifiers changed from: protected */
    public DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> singleOutputStreamOperator, KeySelector<OUT, ?> keySelector, WindowUtils.WindowTransformation windowTransformation, boolean z) {
        this.isPartitioned = false;
        this.groupByKey = keySelector;
        this.discretizedStream = singleOutputStreamOperator;
        this.transformation = windowTransformation;
        this.isPartitioned = z;
    }

    public String getName() {
        return this.discretizedStream.getName();
    }

    public DiscretizedStream<OUT> name(String str) {
        this.discretizedStream.name(str);
        return this;
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public DataStream<OUT> flatten() {
        return this.discretizedStream.transform("Window Flatten", getType(), new WindowFlattener());
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
        return this.discretizedStream;
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
        DiscretizedStream<OUT> merge = partition(this.transformation).transform(WindowUtils.WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(), new WindowReducer(reduceFunction)).merge();
        return (isGrouped() || !(merge.discretizedStream.operator instanceof WindowMerger)) ? merge : (DiscretizedStream<OUT>) merge.transform(WindowUtils.WindowTransformation.REDUCEWINDOW, "Window Reduce", merge.getType(), new WindowReducer((ReduceFunction) this.discretizedStream.clean(reduceFunction)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
        return (DiscretizedStream<OUT>) wrap((SingleOutputStreamOperator) parallelMerge(extractPartsByID(this), filterEmpty(this), reduceFunction), false);
    }

    private SingleOutputStreamOperator<StreamWindow<OUT>, ?> parallelMerge(DataStream<Tuple2<Integer, Integer>> dataStream, DiscretizedStream<OUT> discretizedStream, ReduceFunction<OUT> reduceFunction) {
        return discretizedStream.discretizedStream.groupBy(new WindowUtils.WindowKey()).connect(dataStream.groupBy(0)).transform("CoFlatMap", discretizedStream.discretizedStream.getType(), new CoStreamFlatMap(isGrouped() ? new ParallelGroupedMerge() : new ParallelMerge(reduceFunction)));
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
        return mapWindow(windowMapFunction, getWindowMapReturnTypes(windowMapFunction, getType()));
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> typeInformation) {
        return partition(this.transformation).transform(WindowUtils.WindowTransformation.MAPWINDOW, "Window Map", typeInformation, new WindowMapper((WindowMapFunction) this.discretizedStream.clean(windowMapFunction))).merge();
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public <R> DiscretizedStream<R> foldWindow(R r, FoldFunction<OUT, R> foldFunction, TypeInformation<R> typeInformation) {
        return partition(this.transformation).transform(WindowUtils.WindowTransformation.FOLDWINDOW, "Fold Window", typeInformation, new WindowFolder((FoldFunction) this.discretizedStream.clean(foldFunction), r)).merge();
    }

    private <R> DiscretizedStream<R> transform(WindowUtils.WindowTransformation windowTransformation, String str, TypeInformation<R> typeInformation, OneInputStreamOperator<StreamWindow<OUT>, StreamWindow<R>> oneInputStreamOperator) {
        return wrap(this.discretizedStream.transform(str, new StreamWindowTypeInfo(typeInformation), oneInputStreamOperator), windowTransformation);
    }

    private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> discretizedStream) {
        StreamFilter streamFilter = new StreamFilter(new EmptyWindowFilter());
        streamFilter.disableInputCopy();
        return (DiscretizedStream<OUT>) wrap(discretizedStream.discretizedStream.transform("Filter", discretizedStream.discretizedStream.getType(), streamFilter), discretizedStream.isPartitioned);
    }

    private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> discretizedStream) {
        StreamFlatMap streamFlatMap = new StreamFlatMap(new WindowPartExtractor());
        streamFlatMap.disableInputCopy();
        return discretizedStream.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), streamFlatMap);
    }

    private DiscretizedStream<OUT> partition(WindowUtils.WindowTransformation windowTransformation) {
        int parallelism = this.discretizedStream.getParallelism();
        if (isGrouped()) {
            DiscretizedStream<OUT> parallelism2 = transform(windowTransformation, "Window partitioner", getType(), new WindowPartitioner(this.groupByKey)).setParallelism(parallelism);
            parallelism2.groupByKey = null;
            parallelism2.isPartitioned = true;
            return parallelism2;
        }
        if (windowTransformation != WindowUtils.WindowTransformation.REDUCEWINDOW || parallelism == this.discretizedStream.getExecutionEnvironment().getParallelism()) {
            return this;
        }
        DiscretizedStream<OUT> parallelism3 = transform(windowTransformation, "Window partitioner", getType(), new WindowPartitioner(parallelism)).setParallelism(parallelism);
        parallelism3.isPartitioned = true;
        return parallelism3;
    }

    private DiscretizedStream<OUT> setParallelism(int i) {
        return (DiscretizedStream<OUT>) wrap(this.discretizedStream.setParallelism(i), this.isPartitioned);
    }

    private DiscretizedStream<OUT> merge() {
        return this.isPartitioned ? (DiscretizedStream<OUT>) wrap(this.discretizedStream.groupBy(new WindowUtils.WindowKey()).transform("Window Merger", this.discretizedStream.getType(), new WindowMerger()), false) : this;
    }

    private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> singleOutputStreamOperator, boolean z) {
        return new DiscretizedStream<>(singleOutputStreamOperator, this.groupByKey, this.transformation, z);
    }

    private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> singleOutputStreamOperator, WindowUtils.WindowTransformation windowTransformation) {
        return new DiscretizedStream<>(singleOutputStreamOperator, this.groupByKey, windowTransformation, this.isPartitioned);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    protected Class<?> getClassAtPos(int i) {
        Class<OUT> typeClass;
        TypeInformation<OUT> type = getType();
        if (type.isTupleType()) {
            typeClass = ((TupleTypeInfo) type).getTypeAt(i).getTypeClass();
        } else if (type instanceof BasicArrayTypeInfo) {
            typeClass = ((BasicArrayTypeInfo) type).getComponentTypeClass();
        } else if (type instanceof PrimitiveArrayTypeInfo) {
            Class<OUT> typeClass2 = type.getTypeClass();
            if (typeClass2 == boolean[].class) {
                typeClass = Boolean.class;
            } else if (typeClass2 == short[].class) {
                typeClass = Short.class;
            } else if (typeClass2 == int[].class) {
                typeClass = Integer.class;
            } else if (typeClass2 == long[].class) {
                typeClass = Long.class;
            } else if (typeClass2 == float[].class) {
                typeClass = Float.class;
            } else if (typeClass2 == double[].class) {
                typeClass = Double.class;
            } else {
                if (typeClass2 != char[].class) {
                    throw new IndexOutOfBoundsException("Type could not be determined for array");
                }
                typeClass = Character.class;
            }
        } else {
            if (i != 0) {
                throw new IndexOutOfBoundsException("Position is out of range");
            }
            typeClass = type.getTypeClass();
        }
        return typeClass;
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public ExecutionConfig getExecutionConfig() {
        return this.discretizedStream.getExecutionConfig();
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public TypeInformation<OUT> getType() {
        return ((StreamWindowTypeInfo) this.discretizedStream.getType()).getInnerType();
    }

    private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes(WindowMapFunction<IN, OUT> windowMapFunction, TypeInformation<IN> typeInformation) {
        return TypeExtractor.getUnaryOperatorReturnType(windowMapFunction, WindowMapFunction.class, true, true, typeInformation, null, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public DiscretizedStream<OUT> copy() {
        return new DiscretizedStream<>(this.discretizedStream.copy(), this.groupByKey, this.transformation, this.isPartitioned);
    }

    @Override // org.apache.flink.streaming.api.datastream.WindowedDataStream
    public WindowedDataStream<OUT> local() {
        throw new UnsupportedOperationException("Local discretisation can only be applied after defining the discretisation logic");
    }
}
